diff --git a/README.md b/README.md index aaf4512..350b3bd 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ This is a typical pattern with DataFusion, as files in object storage usually ar Here we walk through a hypothetical example of setting up a materialized view, to illustrate what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function), -called `file_dependencies`, that outputs a build graph for a materialized view. This gives users the information they need to determine +called `mv_dependencies`, that outputs a build graph for a materialized view. This gives users the information they need to determine when partitions of the materialized view need to be recomputed. ```sql @@ -39,8 +39,8 @@ CREATE MATERIALIZED VIEW m1 AS SELECT PARTITIONED BY (year) LOCATION 's3://m1/'; --- Show the dependency graph for m1 using the file_dependencies UDTF -SELECT * FROM file_dependencies('m1'); +-- Show the dependency graph for m1 using the mv_dependencies UDTF +SELECT * FROM mv_dependencies('m1'); +--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified | diff --git a/src/lib.rs b/src/lib.rs index e8f2c77..2ccb398 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,7 +36,7 @@ /// otherwise the tables may not be detected by the incremental view maintenance code, /// including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata), /// [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry), or the -/// [`file_dependencies`](materialized::dependencies::file_dependencies) UDTF. +/// [`mv_dependencies`](materialized::dependencies::mv_dependencies) UDTF. /// /// By default, `ListingTableLike` is implemented for [`ListingTable`](datafusion::datasource::listing::ListingTable), pub mod materialized; diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index e0baebd..0b41ec8 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -18,7 +18,7 @@ use datafusion::{ catalog::CatalogProviderList, config::{CatalogOptions, ConfigOptions}, - datasource::{function::TableFunctionImpl, TableProvider, ViewTable}, + datasource::{function::TableFunctionImpl, provider_as_source, TableProvider, ViewTable}, prelude::{flatten, get_field, make_array}, }; use datafusion_common::{ @@ -27,7 +27,9 @@ use datafusion_common::{ tree_node::{Transformed, TreeNode}, DFSchema, DataFusionError, Result, ScalarValue, }; -use datafusion_expr::{col, lit, utils::split_conjunction, Expr, LogicalPlan, TableScan}; +use datafusion_expr::{ + col, lit, utils::split_conjunction, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, +}; use datafusion_functions::string::expr_fn::{concat, concat_ws}; use datafusion_optimizer::{analyzer::expand_wildcard_rule::ExpandWildcardRule, AnalyzerRule}; use datafusion_sql::TableReference; @@ -41,7 +43,7 @@ use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Mater /// A table function that shows build targets and dependencies for a materialized view: /// /// ```ignore -/// fn file_dependencies(table_ref: Utf8) -> Table +/// fn mv_dependencies(table_ref: Utf8) -> Table /// ``` /// /// `table_ref` should point to a table provider registered for the current session @@ -50,7 +52,7 @@ use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Mater /// # Example /// /// ```sql -/// SELECT * FROM file_dependencies('datafusion.public.m1'); +/// SELECT * FROM mv_dependencies('datafusion.public.m1'); /// /// +--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ /// | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified | @@ -60,7 +62,7 @@ use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Mater /// | s3://m1/partition_column=2023/ | datafusion | public | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 | /// +--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ /// ``` -pub fn file_dependencies( +pub fn mv_dependencies( catalog_list: Arc, row_metadata_registry: Arc, options: &ConfigOptions, @@ -75,19 +77,19 @@ pub fn file_dependencies( #[derive(Debug)] struct FileDependenciesUdtf { catalog_list: Arc, - config_options: ConfigOptions, row_metadata_registry: Arc, + config_options: ConfigOptions, } impl FileDependenciesUdtf { fn new( catalog_list: Arc, row_metadata_registry: Arc, - options: &ConfigOptions, + config_options: &ConfigOptions, ) -> Self { Self { catalog_list, - config_options: options.clone(), + config_options: config_options.clone(), row_metadata_registry, } } @@ -106,29 +108,124 @@ impl TableFunctionImpl for FileDependenciesUdtf { .map_err(|e| DataFusionError::Plan(e.to_string()))?; let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan( - "file_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized".to_string(), + "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized".to_string(), ))?; Ok(Arc::new(ViewTable::try_new( - file_dependencies_plan(mv, self.row_metadata_registry.clone(), &self.config_options)?, + mv_dependencies_plan(mv, self.row_metadata_registry.clone(), &self.config_options)?, None, )?)) } } +/// A table function that shows which files need to be regenerated. +/// Checks `last_modified` timestamps from the file metadata table +/// and deems a target stale if any of its sources are newer than it. +/// +/// # `file_metadata` +/// +/// Accepts a [`TableProvider`] whose schema matches that of [`FileMetadata`](super::file_metadata::FileMetadata). +/// Normally, a `FileMetadata` may be passed in as normal, but custom file metadata sources or mock data can be passed in +/// with a user-provided `TableProvider`. +pub fn stale_files( + catalog_list: Arc, + row_metadata_registry: Arc, + file_metadata: Arc, + config_options: &ConfigOptions, +) -> Arc { + Arc::new(StaleFilesUdtf { + mv_dependencies: FileDependenciesUdtf { + catalog_list, + row_metadata_registry, + config_options: config_options.clone(), + }, + file_metadata, + }) +} + +#[derive(Debug)] +struct StaleFilesUdtf { + mv_dependencies: FileDependenciesUdtf, + file_metadata: Arc, +} + +impl TableFunctionImpl for StaleFilesUdtf { + fn call(&self, args: &[Expr]) -> Result> { + use datafusion::prelude::*; + use datafusion_functions_aggregate::min_max::max; + + let dependencies = provider_as_source(self.mv_dependencies.call(args)?); + + let table_name = get_table_name(args)?; + + let table_ref = TableReference::from(table_name).resolve( + &self.mv_dependencies.config_options.catalog.default_catalog, + &self.mv_dependencies.config_options.catalog.default_schema, + ); + + let logical_plan = + LogicalPlanBuilder::scan_with_filters("dependencies", dependencies, None, vec![])? + .aggregate( + vec![col("dependencies.target").alias("expected_target")], + vec![max(col("source_last_modified")).alias("sources_last_modified")], + )? + .join( + LogicalPlanBuilder::scan_with_filters( + "file_metadata", + provider_as_source( + Arc::clone(&self.file_metadata) as Arc + ), + None, + vec![ + col("table_catalog").eq(lit(table_ref.catalog.as_ref())), + col("table_schema").eq(lit(table_ref.schema.as_ref())), + col("table_name").eq(lit(table_ref.table.as_ref())), + ], + )? + .aggregate( + vec![ + // Trim the final path element + regexp_replace(col("file_path"), lit(r"/[^/]*$"), lit("/"), None) + .alias("existing_target"), + ], + vec![max(col("last_modified")).alias("target_last_modified")], + )? + .project(vec![col("existing_target"), col("target_last_modified")])? + .build()?, + JoinType::Left, + (vec!["expected_target"], vec!["existing_target"]), + None, + )? + .project(vec![ + col("expected_target").alias("target"), + col("target_last_modified"), + col("sources_last_modified"), + coalesce(vec![ + col("target_last_modified"), + lit(ScalarValue::TimestampNanosecond(Some(0), None)), + ]) + .lt(col("sources_last_modified")) + .alias("is_stale"), + ])? + .build()?; + + Ok(Arc::new(ViewTable::try_new(logical_plan, None)?)) + } +} + /// Extract table name from args passed to TableFunctionImpl::call() fn get_table_name(args: &[Expr]) -> Result<&String> { match &args[0] { Expr::Literal(ScalarValue::Utf8(Some(table_name))) => Ok(table_name), _ => Err(DataFusionError::Plan( - "expected a single string literal argument to file_dependencies".to_string(), + "expected a single string literal argument to mv_dependencies".to_string(), )), } } /// Returns a logical plan that, when executed, lists expected build targets /// for this materialized view, together with the dependencies for each target. -pub fn file_dependencies_plan( +pub fn mv_dependencies_plan( materialized_view: &dyn Materialized, row_metadata_registry: Arc, config_options: &ConfigOptions, @@ -758,7 +855,7 @@ mod test { ListingTableLike, Materialized, }; - use super::file_dependencies; + use super::{mv_dependencies, stale_files}; /// A mock materialized view. #[derive(Debug)] @@ -927,7 +1024,7 @@ mod test { let metadata_table = ctx.table_provider("file_metadata").await?; let object_store_metadata_source = Arc::new( - ObjectStoreRowMetadataSource::with_file_metadata(metadata_table), + ObjectStoreRowMetadataSource::with_file_metadata(Arc::clone(&metadata_table)), ); for r in [t1_ref, t2_ref, t3_ref] { @@ -938,14 +1035,24 @@ mod test { } ctx.register_udtf( - "file_dependencies", - file_dependencies( - ctx.state().catalog_list().clone(), + "mv_dependencies", + mv_dependencies( + Arc::clone(ctx.state().catalog_list()), row_metadata_registry.clone(), ctx.copied_config().options(), ), ); + ctx.register_udtf( + "stale_files", + stale_files( + Arc::clone(ctx.state().catalog_list()), + Arc::clone(&row_metadata_registry), + metadata_table, + ctx.copied_config().options(), + ), + ); + Ok(ctx) } @@ -960,6 +1067,7 @@ mod test { file_extension: &'static str, expected_output: Vec<&'static str>, file_metadata: &'static str, + expected_stale_files_output: Vec<&'static str>, } let cases = &[ @@ -986,6 +1094,15 @@ mod test { ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/data.01.parquet', '2023-07-10T16:00:00Z', 0), ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0) ", + expected_stale_files_output: vec![ + "+--------------------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+--------------------------------+----------------------+-----------------------+----------+", + "| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:22 | true |", + "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "+--------------------------------+----------------------+-----------------------+----------+", + ], }, TestCase { name: "transform year/month/day partition into timestamp partition", @@ -1018,6 +1135,18 @@ mod test { ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-05T00:00:00/feed=Y/data.01.parquet', '2023-07-12T16:00:00Z', 0), ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-06T00:00:00/feed=Z/data.01.parquet', '2023-07-10T16:00:00Z', 0) ", + expected_stale_files_output: vec![ + "+-----------------------------------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+-----------------------------------------------+----------------------+-----------------------+----------+", + "| s3://m2/timestamp=2023-01-01T00:00:00/feed=A/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m2/timestamp=2023-01-02T00:00:00/feed=B/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |", + "| s3://m2/timestamp=2023-01-03T00:00:00/feed=C/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |", + "| s3://m2/timestamp=2024-12-04T00:00:00/feed=X/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m2/timestamp=2024-12-05T00:00:00/feed=Y/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |", + "| s3://m2/timestamp=2024-12-06T00:00:00/feed=Z/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |", + "+-----------------------------------------------+----------------------+-----------------------+----------+", + ], }, TestCase { name: "materialized view has no partitions", @@ -1037,6 +1166,13 @@ mod test { file_metadata: " ('datafusion', 'test', 'm3', 's3://m3/data.01.parquet', '2023-07-12T16:00:00Z', 0) ", + expected_stale_files_output: vec![ + "+----------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+----------+----------------------+-----------------------+----------+", + "| s3://m3/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "+----------+----------------------+-----------------------+----------+", + ], }, TestCase { name: "simple equijoin on year", @@ -1063,6 +1199,14 @@ mod test { ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0), ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0) ", + expected_stale_files_output: vec![ + "+--------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+--------------------+----------------------+-----------------------+----------+", + "| s3://m4/year=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "+--------------------+----------------------+-----------------------+----------+", + ], }, TestCase { name: "triangular join on year", @@ -1097,6 +1241,14 @@ mod test { ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0), ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0) ", + expected_stale_files_output: vec![ + "+--------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+--------------------+----------------------+-----------------------+----------+", + "| s3://m4/year=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "+--------------------+----------------------+-----------------------+----------+", + ], }, TestCase { name: "triangular left join, strict <", @@ -1129,6 +1281,14 @@ mod test { ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0), ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0) ", + expected_stale_files_output: vec![ + "+--------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+--------------------+----------------------+-----------------------+----------+", + "| s3://m4/year=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "+--------------------+----------------------+-----------------------+----------+", + ], }, ]; @@ -1183,7 +1343,7 @@ mod test { let df = context .sql(&format!( - "SELECT * FROM file_dependencies('{}', 'v2')", + "SELECT * FROM mv_dependencies('{}', 'v2')", case.table_name, )) .await @@ -1193,6 +1353,18 @@ mod test { assert_batches_sorted_eq!(case.expected_output, &df.collect().await?); + let df = context + .sql(&format!( + "SELECT * FROM stale_files('{}', 'v2')", + case.table_name + )) + .await + .map_err(|e| e.context("get stale files"))?; + df.clone().explain(false, false)?.show().await?; + df.clone().show().await?; + + assert_batches_sorted_eq!(case.expected_stale_files_output, &df.collect().await?); + Ok(()) }