From a3de568e13d1a1e1bc130b4478c01c2b3c4a8a1d Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Tue, 7 Jan 2025 21:09:19 +0000 Subject: [PATCH 1/2] new decorator api --- src/materialized.rs | 45 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/src/materialized.rs b/src/materialized.rs index 10c44d5..4a0a7ac 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -89,7 +89,13 @@ pub fn register_listing_table() { /// Attempt to cast the given TableProvider into a [`ListingTableLike`]. /// If the table's type has not been registered using [`register_listing_table`], will return `None`. pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> { - TABLE_TYPE_REGISTRY.cast_to_listing_table(table) + TABLE_TYPE_REGISTRY + .cast_to_listing_table(table) + .or_else(|| { + TABLE_TYPE_REGISTRY + .cast_to_decorator(table) + .and_then(|decorator| cast_to_listing_table(decorator.base())) + }) } /// A hive-partitioned table in object storage that is defined by a user-provided query. @@ -110,7 +116,24 @@ pub fn register_materialized() { /// Attempt to cast the given TableProvider into a [`Materialized`]. /// If the table's type has not been registered using [`register_materialized`], will return `None`. pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> { - TABLE_TYPE_REGISTRY.cast_to_materialized(table) + TABLE_TYPE_REGISTRY.cast_to_materialized(table).or_else(|| { + TABLE_TYPE_REGISTRY + .cast_to_decorator(table) + .and_then(|decorator| cast_to_materialized(decorator.base())) + }) +} + +/// A `TableProvider` that decorates other `TableProvider`s. +/// Sometimes users may implement a `TableProvider` that overrides functionality of a base `TableProvider`. +/// This API allows the decorator to also be recognized as `ListingTableLike` or `Materialized` automatically. +pub trait Decorator: TableProvider + 'static { + /// The underlying `TableProvider` that this decorator wraps. + fn base(&self) -> &dyn TableProvider; +} + +/// Register `T` as a [`Decorator`]. +pub fn register_decorator() { + TABLE_TYPE_REGISTRY.register_decorator::() } type Downcaster = Arc Option<&T> + Send + Sync>; @@ -123,6 +146,7 @@ type Downcaster = Arc Option<&T> + Send + Sync>; struct TableTypeRegistry { listing_table_accessors: DashMap)>, materialized_accessors: DashMap)>, + decorator_accessors: DashMap)>, } impl Debug for TableTypeRegistry { @@ -145,6 +169,7 @@ impl Default for TableTypeRegistry { let new = Self { listing_table_accessors: DashMap::new(), materialized_accessors: DashMap::new(), + decorator_accessors: DashMap::new(), }; new.register_listing_table::(); @@ -175,6 +200,16 @@ impl TableTypeRegistry { self.register_listing_table::(); } + fn register_decorator(&self) { + self.decorator_accessors.insert( + TypeId::of::(), + ( + type_name::(), + Arc::new(|any| any.downcast_ref::().map(|t| t as &dyn Decorator)), + ), + ); + } + fn cast_to_listing_table<'a>( &'a self, table: &'a dyn TableProvider, @@ -192,4 +227,10 @@ impl TableTypeRegistry { .get(&table.as_any().type_id()) .and_then(|r| r.value().1(table.as_any())) } + + fn cast_to_decorator<'a>(&'a self, table: &'a dyn TableProvider) -> Option<&'a dyn Decorator> { + self.decorator_accessors + .get(&table.as_any().type_id()) + .and_then(|r| r.value().1(table.as_any())) + } } From 7de1622f4cef8e1e7bbc1f18bdb2e9a5963c5457 Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Wed, 8 Jan 2025 10:52:53 -0600 Subject: [PATCH 2/2] exercise decorator in tests --- src/materialized/dependencies.rs | 66 +++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index d56f6fe..760fada 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -107,8 +107,8 @@ impl TableFunctionImpl for FileDependenciesUdtf { let table = util::get_table(self.catalog_list.as_ref(), &table_ref) .map_err(|e| DataFusionError::Plan(e.to_string()))?; - let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan( - "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized".to_string(), + let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan(format!( + "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"), ))?; Ok(Arc::new(ViewTable::try_new( @@ -846,9 +846,9 @@ mod test { use crate::materialized::{ dependencies::pushdown_projection_inexact, - register_materialized, + register_decorator, register_materialized, row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry}, - ListingTableLike, Materialized, + Decorator, ListingTableLike, Materialized, }; use super::{mv_dependencies, stale_files}; @@ -907,10 +907,47 @@ mod test { } } + #[derive(Debug)] + struct DecoratorTable { + inner: Arc, + } + + #[async_trait::async_trait] + impl TableProvider for DecoratorTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn table_type(&self) -> TableType { + self.inner.table_type() + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + self.inner.scan(state, projection, filters, limit).await + } + } + + impl Decorator for DecoratorTable { + fn base(&self) -> &dyn TableProvider { + self.inner.as_ref() + } + } + async fn setup() -> Result { let _ = env_logger::builder().is_test(true).try_init(); register_materialized::(); + register_decorator::(); let state = SessionStateBuilder::new() .with_default_features() @@ -1298,15 +1335,18 @@ mod test { context .register_table( case.table_name, - Arc::new(MockMaterializedView { - table_path: case.table_path.clone(), - partition_columns: case - .partition_cols - .iter() - .map(|s| s.to_string()) - .collect(), - query: plan, - file_ext: case.file_extension, + // Register table with a decorator to exercise this functionality + Arc::new(DecoratorTable { + inner: Arc::new(MockMaterializedView { + table_path: case.table_path.clone(), + partition_columns: case + .partition_cols + .iter() + .map(|s| s.to_string()) + .collect(), + query: plan, + file_ext: case.file_extension, + }), }), ) .expect("couldn't register materialized view");