Skip to content

Commit

Permalink
port MV dependency code
Browse files Browse the repository at this point in the history
  • Loading branch information
suremarc committed Dec 26, 2024
1 parent b7974e8 commit 371066d
Show file tree
Hide file tree
Showing 7 changed files with 1,687 additions and 81 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ datafusion-common = "43"
datafusion-expr = "43"
datafusion-functions = "43"
datafusion-functions-aggregate = "43"
datafusion-optimizer = "43"
datafusion-physical-expr = "43"
datafusion-physical-plan = "43"
datafusion-sql = "43"
Expand Down
43 changes: 42 additions & 1 deletion src/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

/// Track dependencies of materialized data in object storage
mod dependencies;
pub mod dependencies;

/// Pluggable metadata sources for incremental view maintenance
pub mod row_metadata;
Expand All @@ -27,6 +27,9 @@ pub mod file_metadata;
/// A UDF that parses Hive partition elements from object storage paths.
mod hive_partition;

/// Some private utility functions
mod util;

use std::{
any::{type_name, Any, TypeId},
fmt::Debug,
Expand Down Expand Up @@ -95,6 +98,21 @@ pub trait Materialized: ListingTableLike {
fn query(&self) -> LogicalPlan;
}

/// Register a [`Materialized`] implementation in this registry.
/// This allows `cast_to_materialized` to easily downcast a [`TableProvider`]
/// into a [`Materialized`] where possible.
///
/// Note that this will also register `T` as a [`ListingTableLike`].
pub fn register_materialized<T: Materialized>() {
TABLE_TYPE_REGISTRY.register_materialized::<T>();
}

/// Attempt to cast the given TableProvider into a [`Materialized`].
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
TABLE_TYPE_REGISTRY.cast_to_materialized(table)
}

type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;

/// A registry for implementations of [`ListingTableLike`], used for downcasting
Expand All @@ -104,6 +122,7 @@ type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
/// By default, [`ListingTable`] is registered.
struct TableTypeRegistry {
listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
}

impl Debug for TableTypeRegistry {
Expand All @@ -125,6 +144,7 @@ impl Default for TableTypeRegistry {
fn default() -> Self {
let new = Self {
listing_table_accessors: DashMap::new(),
materialized_accessors: DashMap::new(),
};
new.register_listing_table::<ListingTable>();

Expand All @@ -143,6 +163,18 @@ impl TableTypeRegistry {
);
}

fn register_materialized<T: Materialized>(&self) {
self.materialized_accessors.insert(
TypeId::of::<T>(),
(
type_name::<T>(),
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Materialized)),
),
);

self.register_listing_table::<T>();
}

fn cast_to_listing_table<'a>(
&'a self,
table: &'a dyn TableProvider,
Expand All @@ -151,4 +183,13 @@ impl TableTypeRegistry {
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}

fn cast_to_materialized<'a>(
&'a self,
table: &'a dyn TableProvider,
) -> Option<&'a dyn Materialized> {
self.materialized_accessors
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}
}
Loading

0 comments on commit 371066d

Please sign in to comment.