From b7974e8e248b474b61eb2c5e2c2730ade3cd5c57 Mon Sep 17 00:00:00 2001 From: Matthew Cramerus <8771538+suremarc@users.noreply.github.com> Date: Thu, 26 Dec 2024 12:01:41 -0600 Subject: [PATCH] Add `FileMetadata` table and `RowMetadataRegistry` (#2) * port file metadata + hive_partition + row metadata + add tests * better docs * more comments * fix link * fix another comment * allow the same licenses as datafusion-orc * add unicode-3.0 license to allowlist --- Cargo.toml | 25 +- deny.toml | 10 +- src/lib.rs | 26 +- src/materialized.rs | 138 ++++ src/materialized/dependencies.rs | 16 + src/materialized/file_metadata.rs | 1130 ++++++++++++++++++++++++++++ src/materialized/hive_partition.rs | 268 +++++++ src/materialized/row_metadata.rs | 88 +++ 8 files changed, 1690 insertions(+), 11 deletions(-) create mode 100644 src/materialized/dependencies.rs create mode 100644 src/materialized/file_metadata.rs create mode 100644 src/materialized/hive_partition.rs create mode 100644 src/materialized/row_metadata.rs diff --git a/Cargo.toml b/Cargo.toml index 61de055..ff06e56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,29 @@ authors = ["Matthew Cramerus "] license = "Apache-2.0" description = "Materialized Views & Query Rewriting in DataFusion" keywords = ["arrow", "arrow-rs", "datafusion"] -rust-version = "1.73" +rust-version = "1.80" [dependencies] +arrow = "53" +arrow-schema = "53" +async-trait = "0.1" +dashmap = "6" +datafusion = "43" +datafusion-common = "43" +datafusion-expr = "43" +datafusion-functions = "43" +datafusion-functions-aggregate = "43" +datafusion-physical-expr = "43" +datafusion-physical-plan = "43" +datafusion-sql = "43" +futures = "0.3" +itertools = "0.13" +log = "0.4" +object_store = "0.11" + +[dev-dependencies] +anyhow = "1.0.95" +env_logger = "0.11.6" +tempfile = "3.14.0" +tokio = "1.42.0" +url = "2.5.4" diff --git a/deny.toml b/deny.toml index 86e4b1c..90ed3af 100644 --- a/deny.toml +++ b/deny.toml @@ -16,5 +16,13 @@ # under the License. [licenses] -allow = ["Apache-2.0"] +allow = [ + "Apache-2.0", + "Apache-2.0 WITH LLVM-exception", + "MIT", + "BSD-2-Clause", + "BSD-3-Clause", + "CC0-1.0", + "Unicode-3.0" +] version = 2 diff --git a/src/lib.rs b/src/lib.rs index 8c8c5af..766c5ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,14 +15,22 @@ // specific language governing permissions and limitations // under the License. -/// Code for incremental view maintenance. -mod materialized; +#![deny(missing_docs)] -/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views. -mod rewrite; +//! `datafusion-materialized-views` implements algorithms and functionality for materialized views in DataFusion. + +/// Code for incremental view maintenance against Hive-partitioned tables. +/// +/// An example of a Hive-partitioned table is the [`ListingTable`](datafusion::datasource::listing::ListingTable). +/// By analyzing the fragment of the materialized view query pertaining to the partition columns, +/// we can derive a build graph that relates the files of a materialized views and the files of the tables it depends on. +/// +/// A central trait is defined for Hive-partitioned tables, [`ListingTableLike`](materialized::ListingTableLike). Note that +/// all implementations of [`ListingTableLike`](materialized::ListingTableLike) must be registered using the +/// [`register_listing_table`](materialized::register_listing_table) function, otherwise the tables may not be detected by +/// the incremental view maintenance code, including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata) +/// or [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry). +pub mod materialized; -#[cfg(test)] -mod test { - #[test] - fn test_it_works() {} -} +/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views. +pub mod rewrite; diff --git a/src/materialized.rs b/src/materialized.rs index b248758..0151920 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -14,3 +14,141 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +/// Track dependencies of materialized data in object storage +mod dependencies; + +/// Pluggable metadata sources for incremental view maintenance +pub mod row_metadata; + +/// A virtual table that exposes files in object storage. +pub mod file_metadata; + +/// A UDF that parses Hive partition elements from object storage paths. +mod hive_partition; + +use std::{ + any::{type_name, Any, TypeId}, + fmt::Debug, + sync::{Arc, LazyLock}, +}; + +use dashmap::DashMap; +use datafusion::{ + catalog::TableProvider, + datasource::listing::{ListingTable, ListingTableUrl}, +}; +use datafusion_expr::LogicalPlan; +use itertools::Itertools; + +/// The identifier of the column that [`RowMetadataSource`](row_metadata::RowMetadataSource) implementations should store row metadata in. +pub const META_COLUMN: &str = "__meta"; + +static TABLE_TYPE_REGISTRY: LazyLock = LazyLock::new(TableTypeRegistry::default); + +/// A [`TableProvider`] whose data is backed by Hive-partitioned files in object storage. +pub trait ListingTableLike: TableProvider + 'static { + /// Object store URLs for this table + fn table_paths(&self) -> Vec; + + /// Hive partition columns + fn partition_columns(&self) -> Vec; + + /// File extension used by this listing table + fn file_ext(&self) -> String; +} + +impl ListingTableLike for ListingTable { + fn table_paths(&self) -> Vec { + self.table_paths().clone() + } + + fn partition_columns(&self) -> Vec { + self.options() + .table_partition_cols + .iter() + .map(|(name, _data_type)| name.clone()) + .collect_vec() + } + + fn file_ext(&self) -> String { + self.options().file_extension.clone() + } +} + +/// Register a [`ListingTableLike`] implementation in this registry. +/// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`] +/// into a [`ListingTableLike`] where possible. +pub fn register_listing_table() { + TABLE_TYPE_REGISTRY.register_listing_table::(); +} + +/// Attempt to cast the given TableProvider into a [`ListingTableLike`]. +/// If the table's type has not been registered using [`register_listing_table`], will return `None`. +pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> { + TABLE_TYPE_REGISTRY.cast_to_listing_table(table) +} + +/// A hive-partitioned table in object storage that is defined by a user-provided query. +pub trait Materialized: ListingTableLike { + /// The query that defines this materialized view. + fn query(&self) -> LogicalPlan; +} + +type Downcaster = Arc Option<&T> + Send + Sync>; + +/// A registry for implementations of [`ListingTableLike`], used for downcasting +/// arbitrary TableProviders into `dyn ListingTableLike` where possible. +/// +/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike`. +/// By default, [`ListingTable`] is registered. +struct TableTypeRegistry { + listing_table_accessors: DashMap)>, +} + +impl Debug for TableTypeRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TableTypeRegistry") + .field( + "listing_table_accessors", + &self + .listing_table_accessors + .iter() + .map(|r| r.value().0) + .collect_vec(), + ) + .finish() + } +} + +impl Default for TableTypeRegistry { + fn default() -> Self { + let new = Self { + listing_table_accessors: DashMap::new(), + }; + new.register_listing_table::(); + + new + } +} + +impl TableTypeRegistry { + fn register_listing_table(&self) { + self.listing_table_accessors.insert( + TypeId::of::(), + ( + type_name::(), + Arc::new(|any| any.downcast_ref::().map(|t| t as &dyn ListingTableLike)), + ), + ); + } + + fn cast_to_listing_table<'a>( + &'a self, + table: &'a dyn TableProvider, + ) -> Option<&'a dyn ListingTableLike> { + self.listing_table_accessors + .get(&table.as_any().type_id()) + .and_then(|r| r.value().1(table.as_any())) + } +} diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs new file mode 100644 index 0000000..b248758 --- /dev/null +++ b/src/materialized/dependencies.rs @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs new file mode 100644 index 0000000..0e79e5d --- /dev/null +++ b/src/materialized/file_metadata.rs @@ -0,0 +1,1130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Int64Builder, StringBuilder, UInt64Builder}; +use arrow::record_batch::RecordBatch; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::catalog::SchemaProvider; +use datafusion::catalog::{CatalogProvider, Session}; +use datafusion::datasource::listing::ListingTableUrl; +use datafusion::datasource::{provider_as_source, TableProvider}; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties}; +use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal}; +use datafusion::physical_plan::limit::LimitStream; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PhysicalExpr, + PlanProperties, +}; +use datafusion::{ + catalog::CatalogProviderList, execution::TaskContext, physical_plan::SendableRecordBatchStream, +}; +use datafusion_common::{DataFusionError, Result, ScalarValue, ToDFSchema}; +use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; +use futures::stream::{self, BoxStream}; +use futures::{future, Future, FutureExt, StreamExt, TryStreamExt}; +use itertools::Itertools; +use log::debug; +use object_store::{ObjectMeta, ObjectStore}; +use std::any::Any; +use std::sync::Arc; + +use crate::materialized::cast_to_listing_table; +use crate::materialized::{hive_partition::hive_partition, META_COLUMN}; + +use super::row_metadata::RowMetadataSource; + +/// A virtual file metadata table, inspired by the information schema column table. +#[derive(Debug, Clone)] +pub struct FileMetadata { + table_schema: SchemaRef, + catalog_list: Arc, +} + +impl FileMetadata { + /// Construct a new [`FileMetadata`] table provider that lists files for all + /// tables in the provided catalog list. + pub fn new(catalog_list: Arc) -> Self { + Self { + table_schema: Arc::new(Schema::new(vec![ + Field::new("table_catalog", DataType::Utf8, false), + Field::new("table_schema", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("file_path", DataType::Utf8, false), + Field::new("last_modified", DataType::Int64, false), + Field::new("size", DataType::UInt64, false), + ])), + catalog_list, + } + } +} + +#[async_trait] +impl TableProvider for FileMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.table_schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + session_state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let dfschema = self.table_schema.clone().to_dfschema()?; + + let filters = filters + .iter() + .map(|expr| { + create_physical_expr(expr, &dfschema, session_state.execution_props()) + .map_err(|e| e.context("failed to create file metadata physical expr")) + }) + .collect::, _>>()?; + + let exec = FileMetadataExec::try_new( + self.table_schema.clone(), + projection.cloned(), + filters, + limit, + self.catalog_list.clone(), + )?; + + Ok(Arc::new(exec)) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } +} + +impl RowMetadataSource for FileMetadata { + fn name(&self) -> &str { + "FileMetadata" + } + + /// Scan for partition column values using object store metadata. + /// This allows us to efficiently scan for distinct partition column values without + /// ever reading from a table directly, which is useful for low-overhead + /// incremental view maintenance. + fn row_metadata( + self: Arc, + table: datafusion_sql::ResolvedTableReference, + scan: &datafusion_expr::TableScan, + ) -> Result { + use datafusion::datasource::source_as_provider; + use datafusion::functions::core::expr_fn::named_struct; + use datafusion::prelude::*; + + // Check that the remaining columns in the source table scans are indeed partition columns + let partition_cols = cast_to_listing_table(source_as_provider(&scan.source)?.as_ref()) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Table '{}' was not registered in TableTypeRegistry", + scan.table_name + )) + })? + .partition_columns(); + + for column in scan.projected_schema.columns() { + if !partition_cols.contains(&column.name) { + return Err(DataFusionError::Internal(format!("Row metadata not available on non-partition column from source table '{table}': {}", column.name))); + } + } + + let fields = scan.projected_schema.fields(); + + let row_metadata_expr = make_array(vec![named_struct(vec![ + lit("table_catalog"), + col("table_catalog"), + lit("table_schema"), + col("table_schema"), + lit("table_name"), + col("table_name"), + lit("source_uri"), // Map file_path to source_uri + col("file_path"), + lit("last_modified"), + col("last_modified"), + ])]) + .alias(META_COLUMN); + + datafusion_expr::LogicalPlanBuilder::scan("file_metadata", provider_as_source(self), None)? + .filter( + col("table_catalog") + .eq(lit(table.catalog.as_ref())) + .and(col("table_schema").eq(lit(table.schema.as_ref()))) + .and(col("table_name").eq(lit(table.table.as_ref()))), + )? + .project( + fields + .iter() + .map(|field| { + // CAST(hive_partition(file_path, 'field_name', true) AS field_data_type) AS field_name + cast( + hive_partition(vec![col("file_path"), lit(field.name()), lit(true)]), + field.data_type().clone(), + ) + .alias(field.name()) + }) + .chain(Some(row_metadata_expr)), + ) + } +} + +/// An [`ExecutionPlan`] that scans object store metadata. +pub struct FileMetadataExec { + table_schema: SchemaRef, + plan_properties: PlanProperties, + projection: Option>, + filters: Vec>, + limit: Option, + metrics: ExecutionPlanMetricsSet, + catalog_list: Arc, +} + +impl FileMetadataExec { + fn try_new( + table_schema: SchemaRef, + projection: Option>, + filters: Vec>, + limit: Option, + catalog_list: Arc, + ) -> Result { + let projected_schema = match projection.as_ref() { + Some(projection) => Arc::new(table_schema.project(projection)?), + None => table_schema.clone(), + }; + let eq_properties = EquivalenceProperties::new(projected_schema); + let partitioning = Partitioning::UnknownPartitioning(1); + let execution_mode = ExecutionMode::Bounded; + let plan_properties = PlanProperties::new(eq_properties, partitioning, execution_mode); + + let exec = Self { + table_schema, + plan_properties, + projection, + filters, + limit, + metrics: ExecutionPlanMetricsSet::new(), + catalog_list, + }; + + Ok(exec) + } +} + +impl ExecutionPlan for FileMetadataExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "FileMetadataExec" + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let projection = self.projection.clone(); + let record_batches = self.build_record_batch(context)?; + + let projected_record_batches = record_batches.map(move |record_batches| { + let record_batches = match record_batches { + Ok(record_batches) => record_batches, + Err(err) => return vec![Err(err)], + }; + + if let Some(projection) = projection { + return record_batches + .into_iter() + .map(|record_batch| { + record_batch + .project(&projection) + .map_err(|e| DataFusionError::ArrowError(e, None)) + }) + .collect::>(); + } + + record_batches.into_iter().map(Ok).collect::>() + }); + + let mut record_batch_stream: SendableRecordBatchStream = + Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(projected_record_batches) + .map(stream::iter) + .flatten(), + )); + + if let Some(limit) = self.limit { + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let limit_stream = + LimitStream::new(record_batch_stream, 0, Some(limit), baseline_metrics); + record_batch_stream = Box::pin(limit_stream); + } + + Ok(record_batch_stream) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +impl FileMetadataExec { + fn get_column_index(&self, column_name: &str) -> Result { + let (index, _) = self + .table_schema + .column_with_name(column_name) + .ok_or_else(|| { + DataFusionError::Internal(format!("column '{column_name}' does not exists")) + })?; + Ok(index) + } + + /// Get the string literal value from an 'equals' BinaryExpr with a column. + fn get_column_literal(column_idx: usize, filter: &Arc) -> Option { + let binary_expr = filter.as_any().downcast_ref::()?; + + if !matches!(binary_expr.op(), Operator::Eq) { + return None; + } + + let (column, literal) = if let Some(left_column) = + binary_expr.left().as_any().downcast_ref::() + { + let right_literal = binary_expr.right().as_any().downcast_ref::()?; + (left_column, right_literal) + } else if let Some(right_column) = binary_expr.right().as_any().downcast_ref::() { + let left_literal = binary_expr.left().as_any().downcast_ref::()?; + (right_column, left_literal) + } else { + return None; + }; + + if column.index() != column_idx { + return None; + } + + match literal.value() { + ScalarValue::Utf8(str) => str.clone(), + ScalarValue::LargeUtf8(str) => str.clone(), + _ => None, + } + } + + /// Builds a RecordBatch containing file metadata that satisfies the provided filters. + fn build_record_batch( + &self, + context: Arc, + ) -> Result>>> { + let catalog_column = self.get_column_index("table_catalog")?; + let schema_column = self.get_column_index("table_schema")?; + let table_column = self.get_column_index("table_name")?; + + let catalog_name = self + .filters + .iter() + .filter_map(|filter| Self::get_column_literal(catalog_column, filter)) + .next(); + + let schema_name = self + .filters + .iter() + .filter_map(|filter| Self::get_column_literal(schema_column, filter)) + .next(); + + let table_name = self + .filters + .iter() + .filter_map(|filter| Self::get_column_literal(table_column, filter)) + .next(); + + let table_schema = self.table_schema.clone(); + let catalog_list = self.catalog_list.clone(); + + let record_batch = async move { + // If we cannot determine the catalog, build from the entire catalog list. + let catalog_name = match catalog_name { + Some(catalog_name) => catalog_name, + None => { + debug!("No catalog filter exists, returning entire catalog list."); + return FileMetadataBuilder::build_from_catalog_list( + catalog_list, + table_schema, + context, + ) + .await; + } + }; + + // If the specified catalog doesn't exist, return an empty result; + let catalog_provider = match catalog_list.catalog(&catalog_name) { + Some(catalog_provider) => catalog_provider, + None => { + debug!("No catalog named '{catalog_name}' exists, returning an empty result."); + return Ok(vec![]); + } + }; + + // If we cannot determine the schema, build from the catalog. + let schema_name = match schema_name { + Some(schema_name) => schema_name, + None => { + debug!("No schema filter exists, returning catalog '{catalog_name}'."); + return FileMetadataBuilder::build_from_catalog( + &catalog_name, + catalog_provider, + table_schema, + context, + ) + .await; + } + }; + + // If the specified schema doesn't exist, return an empty result. + let schema_provider = match catalog_provider.schema(&schema_name) { + Some(schema_provider) => schema_provider, + None => { + debug!("No schema named '{catalog_name}.{schema_name}' exists, returning an empty result."); + return Ok(vec![]); + } + }; + + // If we cannot determine a table , build from the schema. + let table_name = match table_name { + Some(table_name) => table_name, + None => { + debug!( + "No table filter exists, returning schema '{catalog_name}.{schema_name}'." + ); + return FileMetadataBuilder::build_from_schema( + &catalog_name, + &schema_name, + schema_provider, + table_schema, + context, + ) + .await; + } + }; + + // If the specified table doesn't exist, return an empty result; + let table_provider = match schema_provider.table(&table_name).await? { + Some(table_provider) => table_provider, + None => { + debug!("No table named '{catalog_name}.{schema_name}.{table_name}' exists, returning an empty result."); + return Ok(vec![]); + } + }; + + debug!("Returning table '{catalog_name}.{schema_name}.{table_name}'."); + + let record_batch = FileMetadataBuilder::build_from_table( + &catalog_name, + &schema_name, + &table_name, + table_provider, + table_schema, + context, + ) + .await?; + + Ok(record_batch.into_iter().collect_vec()) + }; + + Ok(record_batch) + } +} + +impl std::fmt::Debug for FileMetadataExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileMetadataExec") + .field("plan_properties", &self.plan_properties) + .field("filters", &self.filters) + .field("limit", &self.limit) + .finish() + } +} + +impl DisplayAs for FileMetadataExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "FileMetadataExec: ")?; + + write!(f, "filters=[")?; + let mut filters = self.filters.iter().peekable(); + while let Some(filter) = filters.next() { + std::fmt::Display::fmt(filter, f)?; + if filters.peek().is_some() { + write!(f, ", ")?; + } + } + write!(f, "]")?; + + if let Some(limit) = &self.limit { + write!(f, ", limit={limit}")?; + } + + Ok(()) + } +} + +struct FileMetadataBuilder { + schema: SchemaRef, + catalog_names: StringBuilder, + schema_names: StringBuilder, + table_names: StringBuilder, + file_paths: StringBuilder, + last_modified: Int64Builder, + size: UInt64Builder, +} + +impl FileMetadataBuilder { + fn new(schema: SchemaRef) -> Self { + Self { + schema, + catalog_names: StringBuilder::new(), + schema_names: StringBuilder::new(), + table_names: StringBuilder::new(), + file_paths: StringBuilder::new(), + last_modified: Int64Builder::new(), + size: UInt64Builder::new(), + } + } + + async fn build_from_catalog_list( + catalog_list: Arc, + schema: SchemaRef, + context: Arc, + ) -> Result> { + let mut tasks = vec![]; + + for catalog_name in catalog_list.catalog_names() { + let catalog_provider = match catalog_list.catalog(&catalog_name) { + Some(catalog_provider) => catalog_provider, + None => continue, + }; + let schema = schema.clone(); + let context = context.clone(); + + tasks.push(async move { + Self::build_from_catalog(&catalog_name, catalog_provider, schema, context).await + }); + } + + let results = future::join_all(tasks).await; + + let record_batches = results + .into_iter() + .collect::, _>>()? + .into_iter() + .flatten() + .collect(); + + Ok(record_batches) + } + + async fn build_from_catalog( + catalog_name: &str, + catalog_provider: Arc, + schema: SchemaRef, + context: Arc, + ) -> Result> { + let mut tasks = vec![]; + + for schema_name in catalog_provider.schema_names() { + let schema_provider = match catalog_provider.schema(&schema_name) { + Some(schema_provider) => schema_provider, + None => continue, + }; + let schema = schema.clone(); + let context = context.clone(); + + tasks.push(async move { + Self::build_from_schema( + catalog_name, + &schema_name, + schema_provider, + schema, + context, + ) + .await + }); + } + + let results = future::join_all(tasks).await; + + let record_batches = results + .into_iter() + .collect::, _>>()? + .into_iter() + .flatten() + .collect(); + + Ok(record_batches) + } + + async fn build_from_schema( + catalog_name: &str, + schema_name: &str, + schema_provider: Arc, + schema: SchemaRef, + context: Arc, + ) -> Result> { + let mut tasks = vec![]; + + for table_name in schema_provider.table_names() { + let table_provider = match schema_provider.table(&table_name).await? { + Some(table_provider) => table_provider, + None => continue, + }; + let schema = schema.clone(); + let context = context.clone(); + + tasks.push(async move { + Self::build_from_table( + catalog_name, + schema_name, + &table_name, + table_provider, + schema, + context, + ) + .await + }) + } + + let results = future::join_all(tasks).await; + let record_batches = results + .into_iter() + .collect::, _>>()? + .into_iter() + .flatten() + .collect(); + + Ok(record_batches) + } + + async fn build_from_table( + catalog_name: &str, + schema_name: &str, + table_name: &str, + table_provider: Arc, + schema: SchemaRef, + context: Arc, + ) -> Result> { + let mut builder = Self::new(schema.clone()); + + let listing_table_like = match cast_to_listing_table(table_provider.as_ref()) { + None => return Ok(None), + Some(t) => t, + }; + + let table_paths = listing_table_like.table_paths(); + let file_extension = listing_table_like.file_ext(); + + for table_path in table_paths { + builder + .read_table_files( + catalog_name, + schema_name, + table_name, + &table_path, + &file_extension, + &context, + ) + .await?; + } + + builder.finish().map(Some) + } + + async fn read_table_files( + &mut self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + table_path: &ListingTableUrl, + file_ext: &str, + context: &TaskContext, + ) -> Result<()> { + let store_url = table_path.object_store(); + let store = context.runtime_env().object_store(table_path)?; + + let mut file_stream = list_all_files( + store.as_ref(), + table_path, + file_ext, + context + .session_config() + .options() + .execution + .listing_table_ignore_subdirectory, + ) + .await; + + while let Some(file_meta) = file_stream.try_next().await? { + self.append( + catalog_name, + schema_name, + table_name, + &store_url, + &file_meta, + ); + } + + Ok(()) + } + + fn append( + &mut self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + store_url: &ObjectStoreUrl, + meta: &ObjectMeta, + ) { + self.catalog_names.append_value(catalog_name); + self.schema_names.append_value(schema_name); + self.table_names.append_value(table_name); + self.file_paths + .append_value(format!("{store_url}{}", meta.location)); + self.last_modified + .append_option(meta.last_modified.timestamp_nanos_opt()); + self.size.append_value(meta.size as u64); // this is not lossy assuming we're on a 64-bit platform + } + + fn finish(mut self) -> Result { + RecordBatch::try_new( + self.schema, + vec![ + Arc::new(self.catalog_names.finish()), + Arc::new(self.schema_names.finish()), + Arc::new(self.table_names.finish()), + Arc::new(self.file_paths.finish()), + Arc::new(self.last_modified.finish()), + Arc::new(self.size.finish()), + ], + ) + .map_err(From::from) + } +} + +// Mostly copied from ListingTableUrl::list_all_files, which is private to that crate +// Modified to handle empty tables +async fn list_all_files<'a>( + store: &'a dyn ObjectStore, + url: &'a ListingTableUrl, + file_extension: &'a str, + ignore_subdirectory: bool, +) -> BoxStream<'a, Result> { + // Check if the directory exists yet + if let Err(object_store::Error::NotFound { path, .. }) = + store.list_with_delimiter(Some(url.prefix())).await + { + debug!( + "attempted to list empty table at {path} during file_metadata listing, returning empty list" + ); + return Box::pin(stream::empty()); + } + + let is_dir = url.as_str().ends_with('/'); + let list = match is_dir { + true => store.list(Some(url.prefix())), + false => futures::stream::once(store.head(url.prefix())).boxed(), + }; + + list.map_err(Into::into) + .try_filter(move |meta| { + let path = &meta.location; + let extension_match = path.as_ref().ends_with(file_extension); + let glob_match = url.contains(path, ignore_subdirectory); + futures::future::ready(extension_match && glob_match) + }) + .boxed() +} + +#[cfg(test)] +mod test { + use std::{ops::Deref, sync::Arc}; + + use anyhow::{Context, Result}; + use datafusion::{ + assert_batches_sorted_eq, + catalog_common::{MemoryCatalogProvider, MemorySchemaProvider}, + execution::{ + object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, + runtime_env::RuntimeEnvBuilder, + }, + prelude::{SessionConfig, SessionContext}, + }; + use object_store::local::LocalFileSystem; + use tempfile::TempDir; + use url::Url; + + use super::FileMetadata; + + struct TestContext { + _dir: TempDir, + ctx: SessionContext, + } + + impl Deref for TestContext { + type Target = SessionContext; + + fn deref(&self) -> &Self::Target { + &self.ctx + } + } + + async fn setup() -> Result { + let _ = env_logger::builder().is_test(true).try_init(); + + let dir = TempDir::new().context("create tempdir")?; + let store = LocalFileSystem::new_with_prefix(&dir) + .map(Arc::new) + .context("create local file system object store")?; + + let registry = Arc::new(DefaultObjectStoreRegistry::new()); + registry + .register_store(&Url::parse("file://").unwrap(), store) + .context("register file system store") + .expect("should replace existing object store at file://"); + + let ctx = SessionContext::new_with_config_rt( + SessionConfig::new(), + RuntimeEnvBuilder::new() + .with_object_store_registry(registry) + .build_arc() + .context("create RuntimeEnv")?, + ); + + ctx.catalog("datafusion") + .context("get default catalog")? + .register_schema("private", Arc::::default()) + .context("register datafusion.private schema")?; + + ctx.register_catalog("datafusion_mv", Arc::::default()); + + ctx.catalog("datafusion_mv") + .context("get datafusion_mv catalog")? + .register_schema("public", Arc::::default()) + .context("register datafusion_mv.public schema")?; + + ctx.sql( + " + CREATE EXTERNAL TABLE t1 (num INTEGER, year TEXT) + STORED AS CSV + PARTITIONED BY (year) + LOCATION 'file:///t1/' + ", + ) + .await? + .collect() + .await?; + + ctx.sql( + "INSERT INTO t1 VALUES + (1, '2021'), + (2, '2022'), + (3, '2023'), + (4, '2024') + ", + ) + .await? + .collect() + .await?; + + ctx.sql( + " + CREATE EXTERNAL TABLE private.t1 (num INTEGER, year TEXT, month TEXT) + STORED AS CSV + PARTITIONED BY (year, month) + LOCATION 'file:///private/t1/' + ", + ) + .await? + .collect() + .await?; + + ctx.sql( + "INSERT INTO private.t1 VALUES + (1, '2021', '01'), + (2, '2022', '02'), + (3, '2023', '03'), + (4, '2024', '04') + ", + ) + .await? + .collect() + .await?; + + ctx.sql( + " + CREATE EXTERNAL TABLE datafusion_mv.public.t3 (num INTEGER, date DATE) + STORED AS CSV + PARTITIONED BY (date) + LOCATION 'file:///datafusion_mv/public/t3/' + ", + ) + .await? + .collect() + .await?; + + ctx.sql( + "INSERT INTO datafusion_mv.public.t3 VALUES + (1, '2021-01-01'), + (2, '2022-02-02'), + (3, '2023-03-03'), + (4, '2024-04-04') + ", + ) + .await? + .collect() + .await?; + + ctx.register_table( + "file_metadata", + Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))), + ) + .context("register file metadata table")?; + + ctx.sql( + // Remove timestamps and trim (randomly generated) file names since they're not stable in tests + "CREATE VIEW file_metadata_test_view AS SELECT + * EXCLUDE(file_path, last_modified), + regexp_replace(file_path, '/[^/]*$', '/') AS file_path + FROM file_metadata", + ) + .await + .context("create file metadata test view")?; + + Ok(TestContext { _dir: dir, ctx }) + } + + #[tokio::test] + async fn test_list_all_files() -> Result<()> { + let ctx = setup().await.context("setup")?; + + let results = ctx + .sql("SELECT * FROM file_metadata_test_view") + .await? + .collect() + .await?; + + assert_batches_sorted_eq!(&[ + "+---------------+--------------+------------+------+--------------------------------------------------+", + "| table_catalog | table_schema | table_name | size | file_path |", + "+---------------+--------------+------------+------+--------------------------------------------------+", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2021/month=01/ |", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2022/month=02/ |", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2023/month=03/ |", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2024/month=04/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2021-01-01/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2022-02-02/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2023-03-03/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2024-04-04/ |", + "+---------------+--------------+------------+------+--------------------------------------------------+", + ], &results); + + Ok(()) + } + + #[tokio::test] + async fn test_list_catalog() -> Result<()> { + let ctx = setup().await.context("setup")?; + + let results = ctx + .sql( + "SELECT * FROM file_metadata_test_view + WHERE table_catalog = 'datafusion_mv'", + ) + .await? + .collect() + .await?; + + assert_batches_sorted_eq!(&[ + "+---------------+--------------+------------+------+--------------------------------------------------+", + "| table_catalog | table_schema | table_name | size | file_path |", + "+---------------+--------------+------------+------+--------------------------------------------------+", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2021-01-01/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2022-02-02/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2023-03-03/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2024-04-04/ |", + "+---------------+--------------+------------+------+--------------------------------------------------+", + ], &results); + + Ok(()) + } + + #[tokio::test] + async fn test_list_catalog_and_schema() -> Result<()> { + let ctx = setup().await.context("setup")?; + + let results = ctx + .sql( + "SELECT * FROM file_metadata_test_view + WHERE table_catalog = 'datafusion' AND table_schema = 'private'", + ) + .await? + .collect() + .await?; + + assert_batches_sorted_eq!(&[ + "+---------------+--------------+------------+------+----------------------------------------+", + "| table_catalog | table_schema | table_name | size | file_path |", + "+---------------+--------------+------------+------+----------------------------------------+", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2021/month=01/ |", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2022/month=02/ |", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2023/month=03/ |", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2024/month=04/ |", + "+---------------+--------------+------------+------+----------------------------------------+", + ], &results); + + Ok(()) + } + + #[tokio::test] + async fn test_list_schema_only() -> Result<()> { + let ctx = setup().await.context("setup")?; + + let results = ctx + .sql( + "SELECT * FROM file_metadata_test_view + WHERE table_schema = 'public'", + ) + .await? + .collect() + .await?; + + assert_batches_sorted_eq!(&[ + "+---------------+--------------+------------+------+--------------------------------------------------+", + "| table_catalog | table_schema | table_name | size | file_path |", + "+---------------+--------------+------------+------+--------------------------------------------------+", + "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2021-01-01/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2022-02-02/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2023-03-03/ |", + "| datafusion_mv | public | t3 | 6 | file:///datafusion_mv/public/t3/date=2024-04-04/ |", + "+---------------+--------------+------------+------+--------------------------------------------------+", + ], &results); + + Ok(()) + } + + #[tokio::test] + async fn test_list_catalog_schema_and_table() -> Result<()> { + let ctx = setup().await.context("setup")?; + + let results = ctx + .sql( + "SELECT * FROM file_metadata_test_view + WHERE table_catalog = 'datafusion' AND table_schema = 'public' AND table_name = 't1'", + ) + .await? + .collect() + .await?; + + assert_batches_sorted_eq!( + &[ + "+---------------+--------------+------------+------+-----------------------+", + "| table_catalog | table_schema | table_name | size | file_path |", + "+---------------+--------------+------------+------+-----------------------+", + "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |", + "+---------------+--------------+------------+------+-----------------------+", + ], + &results + ); + + Ok(()) + } + + #[tokio::test] + async fn test_list_table_only() -> Result<()> { + let ctx = setup().await.context("setup")?; + + let results = ctx + .sql( + "SELECT * FROM file_metadata_test_view + WHERE table_name = 't1'", + ) + .await? + .collect() + .await?; + + assert_batches_sorted_eq!( + &[ + "+---------------+--------------+------------+------+----------------------------------------+", + "| table_catalog | table_schema | table_name | size | file_path |", + "+---------------+--------------+------------+------+----------------------------------------+", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2021/month=01/ |", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2022/month=02/ |", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2023/month=03/ |", + "| datafusion | private | t1 | 6 | file:///private/t1/year=2024/month=04/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2021/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2022/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2023/ |", + "| datafusion | public | t1 | 6 | file:///t1/year=2024/ |", + "+---------------+--------------+------------+------+----------------------------------------+", + ], + &results + ); + + Ok(()) + } +} diff --git a/src/materialized/hive_partition.rs b/src/materialized/hive_partition.rs new file mode 100644 index 0000000..34bb7ae --- /dev/null +++ b/src/materialized/hive_partition.rs @@ -0,0 +1,268 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::{Array, StringArray, StringBuilder}; +use arrow_schema::DataType; + +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_expr::{ + expr::ScalarFunction, ColumnarValue, Expr, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; + +pub static HIVE_PARTITION_UDF_NAME: &str = "hive_partition"; + +/// A DataFusion UDF with the following signature: +/// +/// ```ignore +/// hive_partition(file_path: Utf8, partition_column: Utf8, null_if_missing: bool = false) -> Utf8 +/// ``` +/// +/// that extracts the partition column from a path, assuming the file path follows the Hive partitioning scheme. +/// Partition values are returned as strings; +/// use the ARROW_CAST function to coerce the partition values into the correct data type. +/// +/// # `null_if_missing` +/// +/// By default, `hive_partition` throws an error if the named partition column is absent. +/// One can instead return null in such cases by passing a third argument: +/// ```sql +/// hive_partition(, , true) +/// ``` +/// which will not throw an error if is absent from . +/// +/// # Examples +/// +/// ```sql +/// SELECT +/// column_name, +/// hive_partition( +/// 's3://atlas/sip/trades/year=2006/month=01/day=02/trades-2006-01-02.parquet', +/// column_name +/// ) AS partition_value +/// FROM (VALUES ('year'), ('month'), ('day')) AS partition_columns (column_name); +/// +/// // +-------------+-----------------+ +/// // | column_name | partition_value | +/// // +-------------+-----------------+ +/// // | year | 2006 | +/// // | month | 01 | +/// // | day | 02 | +/// // +-------------+-----------------+ +/// ``` +pub fn hive_partition_udf() -> ScalarUDF { + let signature = Signature::one_of( + vec![ + TypeSignature::Uniform(2, vec![DataType::Utf8]), + TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8, DataType::Boolean]), + ], + Volatility::Immutable, + ); + + let udf_impl = HivePartitionUdf { signature }; + ScalarUDF::new_from_impl(udf_impl) +} + +#[derive(Debug)] +struct HivePartitionUdf { + pub signature: Signature, +} + +impl ScalarUDFImpl for HivePartitionUdf { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + HIVE_PARTITION_UDF_NAME + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke(&self, values: &[ColumnarValue]) -> Result { + let null_if_missing = values + .get(2) + .map(|val| match val { + ColumnarValue::Scalar(ScalarValue::Boolean(Some(b))) => Ok(*b), + _ => Err(DataFusionError::Execution( + "expected a boolean scalar for argument #3 of 'hive_partition'".to_string(), + )), + }) + .transpose()? + .unwrap_or(false); + + let arrays = ColumnarValue::values_to_arrays(values)?; + + let [file_paths, table_partition_columns]: [Option<&StringArray>; 2] = + [&arrays[0], &arrays[1]].map(|arg| arg.as_any().downcast_ref()); + + file_paths + .zip(table_partition_columns) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Error while executing {HIVE_PARTITION_UDF_NAME}: \ + Type check failed" + )) + }) + .and_then(|(file_paths, table_partition_columns)| { + let mut builder = StringBuilder::new(); + + for (file_path, table_partition_column) in + file_paths.iter().zip(table_partition_columns.iter()) + { + match file_path.zip(table_partition_column) { + Some((file_path, table_partition_column)) => { + builder.append_option(parse_partitions_for_path( + file_path, + table_partition_column, + null_if_missing, + )?); + } + _ => builder.append_null(), + }; + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + }) + } +} + +pub fn hive_partition(args: Vec) -> Expr { + Expr::ScalarFunction(ScalarFunction { + func: Arc::new(hive_partition_udf()), + args, + }) +} + +// Extract partition column values from a file path, erroring if any partition columns are not found. +// Accepts a ListingTableUrl that will be trimmed from the file paths as well. +fn parse_partitions_for_path<'a>( + file_path: &'a str, + table_partition_col: &str, + null_if_missing: bool, +) -> Result> { + match file_path.split('/').find_map(|part| { + part.split_once('=') + .and_then(|(name, val)| (name == table_partition_col).then_some(val)) + }) { + Some(part) => Ok(Some(part)), + None if null_if_missing => Ok(None), + _ => Err(DataFusionError::Execution(format!( + "Error while executing {HIVE_PARTITION_UDF_NAME}: \ + Path '{file_path}' did not contain any values corresponding to \ + partition column '{table_partition_col}'" + ))), + } +} + +#[cfg(test)] +mod test { + use super::HIVE_PARTITION_UDF_NAME; + + use datafusion::{assert_batches_sorted_eq, prelude::SessionContext}; + use datafusion_common::Result; + + struct TestCase { + file_path_expr: &'static str, + partition_columns: &'static [&'static str], + expected_output: &'static str, + } + + async fn run_test(context: &SessionContext, case: TestCase) -> Result<()> { + let query = format!( + "SELECT \ + column_name, \ + {HIVE_PARTITION_UDF_NAME}({}, column_name) AS partition_value \ + FROM (VALUES {}) AS partition_columns (column_name)", + case.file_path_expr, + case.partition_columns + .iter() + .map(|s| format!("('{s}')")) + .collect::>() + .join(", ") + ); + + let df = context.sql(dbg!(&query)).await?; + df.clone().show().await?; + + let results = df.collect().await?; + assert_batches_sorted_eq!( + case.expected_output + .split_terminator('\n') + .filter(|s| !s.is_empty()) + .map(str::trim) + .collect::>(), + &results + ); + + Ok(()) + } + + #[tokio::test] + async fn test_extract_hive_partitions() { + let context = SessionContext::new(); + context.register_udf(super::hive_partition_udf()); + + let cases = vec![TestCase { + file_path_expr: "'sip/trades/year=2006/month=01/day=02/trades-2006-01-02.parquet'", + partition_columns: &["year", "month", "day"], + expected_output: " + +-------------+-----------------+ + | column_name | partition_value | + +-------------+-----------------+ + | year | 2006 | + | month | 01 | + | day | 02 | + +-------------+-----------------+", + }]; + + for case in cases { + run_test(&context, case).await.unwrap(); + } + } + + #[tokio::test] + async fn test_extract_hive_partitions_fails() { + let context = SessionContext::new(); + context.register_udf(super::hive_partition_udf()); + + let cases = vec![ + TestCase { + file_path_expr: "'sip/trades/year=2006/month=01/day=02/trades-2006-01-02.parquet'", + partition_columns: &["this-is-not-a-valid-partition-column"], + expected_output: "", + }, + TestCase { + file_path_expr: "1", // numbers should fail the type check + partition_columns: &["year", "month", "day"], + expected_output: "", + }, + ]; + + for case in cases { + dbg!(run_test(&context, case).await).expect_err("test should fail"); + } + } +} diff --git a/src/materialized/row_metadata.rs b/src/materialized/row_metadata.rs new file mode 100644 index 0000000..d0df6e0 --- /dev/null +++ b/src/materialized/row_metadata.rs @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use dashmap::DashMap; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::{LogicalPlanBuilder, TableScan}; +use datafusion_sql::ResolvedTableReference; +use std::sync::Arc; + +/// Registry that manages metadata sources for different tables. +/// Provides a centralized way to register and retrieve metadata sources +/// that can be used to obtain row-level metadata for tables. +#[derive(Default)] +pub struct RowMetadataRegistry { + metadata_sources: DashMap>, +} + +impl RowMetadataRegistry { + /// Registers a metadata source for a specific table. + /// Returns the previously registered source for this table, if any + pub fn register_source( + &self, + table: &ResolvedTableReference, + source: Arc, + ) -> Option> { + self.metadata_sources.insert(table.to_string(), source) + } + + /// Retrieves the registered [`RowMetadataSource`] for a specific table. + pub fn get_source(&self, table: &ResolvedTableReference) -> Result> { + self.metadata_sources + .get(&table.to_string()) + .map(|o| Arc::clone(o.value())) + .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table))) + } +} + +/// A source for "row metadata", that associates rows from a table with +/// metadata used for incremental view maintenance. +/// +/// Most use cases should default to using [`FileMetadata`](super::file_metadata::FileMetadata) for their [`RowMetadataSource`], +/// which uses object store metadata to perform incremental view maintenance on Hive-partitioned tables. +/// However, in some use cases it is necessary to track metadata at a more granular level than Hive partitions. +/// In such cases, users may implement a custom [`RowMetadataSource`] containing this metadata. +/// +/// A [`RowMetadataSource`] may contain metadata for multiple tables. +/// As such, it is the user's responsibility to register each table with the appropriate +/// [`RowMetadataSource`] in the [`RowMetadataRegistry`]. +pub trait RowMetadataSource: Send + Sync { + /// The name of this row metadata source. + fn name(&self) -> &str; + + /// Rewrite this [`TableScan`] as query against this [`RowMetadataSource`], + /// this time adding a new struct column, `__meta`, whose shape conforms to the following schema: + /// + /// ```json + /// { + /// "table_catalog": "string", + /// "table_schema": "string", + /// "table_name": "string", + /// "source_uri": "string", + /// "last_modified": "timestamp", + /// } + /// ``` + /// + /// The returned data should contain the original table scan, up to multiplicity. + /// That is, for each row in the original table scan, the [`RowMetadataSource`] should contain at least + /// one row (but potentially more) with the same values, plus the `__meta` column. + fn row_metadata( + self: Arc, + table: ResolvedTableReference, + scan: &TableScan, + ) -> Result; +}