Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Decorator trait #26

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions src/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ pub fn register_listing_table<T: ListingTableLike>() {
/// Attempt to cast the given TableProvider into a [`ListingTableLike`].
/// If the table's type has not been registered using [`register_listing_table`], will return `None`.
pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
TABLE_TYPE_REGISTRY.cast_to_listing_table(table)
TABLE_TYPE_REGISTRY
.cast_to_listing_table(table)
.or_else(|| {
TABLE_TYPE_REGISTRY
.cast_to_decorator(table)
.and_then(|decorator| cast_to_listing_table(decorator.base()))
})
}

/// A hive-partitioned table in object storage that is defined by a user-provided query.
Expand All @@ -110,7 +116,24 @@ pub fn register_materialized<T: Materialized>() {
/// Attempt to cast the given TableProvider into a [`Materialized`].
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
TABLE_TYPE_REGISTRY.cast_to_materialized(table)
TABLE_TYPE_REGISTRY.cast_to_materialized(table).or_else(|| {
TABLE_TYPE_REGISTRY
.cast_to_decorator(table)
.and_then(|decorator| cast_to_materialized(decorator.base()))
})
}

/// A `TableProvider` that decorates other `TableProvider`s.
/// Sometimes users may implement a `TableProvider` that overrides functionality of a base `TableProvider`.
/// This API allows the decorator to also be recognized as `ListingTableLike` or `Materialized` automatically.
pub trait Decorator: TableProvider + 'static {
/// The underlying `TableProvider` that this decorator wraps.
fn base(&self) -> &dyn TableProvider;
}

/// Register `T` as a [`Decorator`].
pub fn register_decorator<T: Decorator>() {
TABLE_TYPE_REGISTRY.register_decorator::<T>()
}

type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
Expand All @@ -123,6 +146,7 @@ type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
struct TableTypeRegistry {
listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
decorator_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Decorator>)>,
}

impl Debug for TableTypeRegistry {
Expand All @@ -145,6 +169,7 @@ impl Default for TableTypeRegistry {
let new = Self {
listing_table_accessors: DashMap::new(),
materialized_accessors: DashMap::new(),
decorator_accessors: DashMap::new(),
};
new.register_listing_table::<ListingTable>();

Expand Down Expand Up @@ -175,6 +200,16 @@ impl TableTypeRegistry {
self.register_listing_table::<T>();
}

fn register_decorator<T: Decorator>(&self) {
self.decorator_accessors.insert(
TypeId::of::<T>(),
(
type_name::<T>(),
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Decorator)),
),
);
}

fn cast_to_listing_table<'a>(
&'a self,
table: &'a dyn TableProvider,
Expand All @@ -192,4 +227,10 @@ impl TableTypeRegistry {
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}

fn cast_to_decorator<'a>(&'a self, table: &'a dyn TableProvider) -> Option<&'a dyn Decorator> {
self.decorator_accessors
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}
}
66 changes: 53 additions & 13 deletions src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ impl TableFunctionImpl for FileDependenciesUdtf {
let table = util::get_table(self.catalog_list.as_ref(), &table_ref)
.map_err(|e| DataFusionError::Plan(e.to_string()))?;

let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan(
"mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized".to_string(),
let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan(format!(
"mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"),
))?;

Ok(Arc::new(ViewTable::try_new(
Expand Down Expand Up @@ -846,9 +846,9 @@ mod test {

use crate::materialized::{
dependencies::pushdown_projection_inexact,
register_materialized,
register_decorator, register_materialized,
row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry},
ListingTableLike, Materialized,
Decorator, ListingTableLike, Materialized,
};

use super::{mv_dependencies, stale_files};
Expand Down Expand Up @@ -907,10 +907,47 @@ mod test {
}
}

#[derive(Debug)]
struct DecoratorTable {
inner: Arc<dyn TableProvider>,
}

#[async_trait::async_trait]
impl TableProvider for DecoratorTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.inner.schema()
}

fn table_type(&self) -> TableType {
self.inner.table_type()
}

async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
self.inner.scan(state, projection, filters, limit).await
}
}

impl Decorator for DecoratorTable {
fn base(&self) -> &dyn TableProvider {
self.inner.as_ref()
}
}

async fn setup() -> Result<SessionContext> {
let _ = env_logger::builder().is_test(true).try_init();

register_materialized::<MockMaterializedView>();
register_decorator::<DecoratorTable>();

let state = SessionStateBuilder::new()
.with_default_features()
Expand Down Expand Up @@ -1298,15 +1335,18 @@ mod test {
context
.register_table(
case.table_name,
Arc::new(MockMaterializedView {
table_path: case.table_path.clone(),
partition_columns: case
.partition_cols
.iter()
.map(|s| s.to_string())
.collect(),
query: plan,
file_ext: case.file_extension,
// Register table with a decorator to exercise this functionality
Arc::new(DecoratorTable {
inner: Arc::new(MockMaterializedView {
table_path: case.table_path.clone(),
partition_columns: case
.partition_cols
.iter()
.map(|s| s.to_string())
.collect(),
query: plan,
file_ext: case.file_extension,
}),
}),
)
.expect("couldn't register materialized view");
Expand Down
Loading