Skip to content

Commit

Permalink
Add FileMetadata table and RowMetadataRegistry (#2)
Browse files Browse the repository at this point in the history
* port file metadata + hive_partition + row metadata + add tests

* better docs

* more comments

* fix link

* fix another comment

* allow the same licenses as datafusion-orc

* add unicode-3.0 license to allowlist
  • Loading branch information
suremarc authored Dec 26, 2024
1 parent 759aa19 commit b7974e8
Show file tree
Hide file tree
Showing 8 changed files with 1,690 additions and 11 deletions.
25 changes: 24 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@ authors = ["Matthew Cramerus <[email protected]>"]
license = "Apache-2.0"
description = "Materialized Views & Query Rewriting in DataFusion"
keywords = ["arrow", "arrow-rs", "datafusion"]
rust-version = "1.73"
rust-version = "1.80"

[dependencies]
arrow = "53"
arrow-schema = "53"
async-trait = "0.1"
dashmap = "6"
datafusion = "43"
datafusion-common = "43"
datafusion-expr = "43"
datafusion-functions = "43"
datafusion-functions-aggregate = "43"
datafusion-physical-expr = "43"
datafusion-physical-plan = "43"
datafusion-sql = "43"
futures = "0.3"
itertools = "0.13"
log = "0.4"
object_store = "0.11"

[dev-dependencies]
anyhow = "1.0.95"
env_logger = "0.11.6"
tempfile = "3.14.0"
tokio = "1.42.0"
url = "2.5.4"
10 changes: 9 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,13 @@
# under the License.

[licenses]
allow = ["Apache-2.0"]
allow = [
"Apache-2.0",
"Apache-2.0 WITH LLVM-exception",
"MIT",
"BSD-2-Clause",
"BSD-3-Clause",
"CC0-1.0",
"Unicode-3.0"
]
version = 2
26 changes: 17 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@
// specific language governing permissions and limitations
// under the License.

/// Code for incremental view maintenance.
mod materialized;
#![deny(missing_docs)]

/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.
mod rewrite;
//! `datafusion-materialized-views` implements algorithms and functionality for materialized views in DataFusion.
/// Code for incremental view maintenance against Hive-partitioned tables.
///
/// An example of a Hive-partitioned table is the [`ListingTable`](datafusion::datasource::listing::ListingTable).
/// By analyzing the fragment of the materialized view query pertaining to the partition columns,
/// we can derive a build graph that relates the files of a materialized views and the files of the tables it depends on.
///
/// A central trait is defined for Hive-partitioned tables, [`ListingTableLike`](materialized::ListingTableLike). Note that
/// all implementations of [`ListingTableLike`](materialized::ListingTableLike) must be registered using the
/// [`register_listing_table`](materialized::register_listing_table) function, otherwise the tables may not be detected by
/// the incremental view maintenance code, including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata)
/// or [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry).
pub mod materialized;

#[cfg(test)]
mod test {
#[test]
fn test_it_works() {}
}
/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.
pub mod rewrite;
138 changes: 138 additions & 0 deletions src/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,141 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Track dependencies of materialized data in object storage
mod dependencies;

/// Pluggable metadata sources for incremental view maintenance
pub mod row_metadata;

/// A virtual table that exposes files in object storage.
pub mod file_metadata;

/// A UDF that parses Hive partition elements from object storage paths.
mod hive_partition;

use std::{
any::{type_name, Any, TypeId},
fmt::Debug,
sync::{Arc, LazyLock},
};

use dashmap::DashMap;
use datafusion::{
catalog::TableProvider,
datasource::listing::{ListingTable, ListingTableUrl},
};
use datafusion_expr::LogicalPlan;
use itertools::Itertools;

/// The identifier of the column that [`RowMetadataSource`](row_metadata::RowMetadataSource) implementations should store row metadata in.
pub const META_COLUMN: &str = "__meta";

static TABLE_TYPE_REGISTRY: LazyLock<TableTypeRegistry> = LazyLock::new(TableTypeRegistry::default);

/// A [`TableProvider`] whose data is backed by Hive-partitioned files in object storage.
pub trait ListingTableLike: TableProvider + 'static {
/// Object store URLs for this table
fn table_paths(&self) -> Vec<ListingTableUrl>;

/// Hive partition columns
fn partition_columns(&self) -> Vec<String>;

/// File extension used by this listing table
fn file_ext(&self) -> String;
}

impl ListingTableLike for ListingTable {
fn table_paths(&self) -> Vec<ListingTableUrl> {
self.table_paths().clone()
}

fn partition_columns(&self) -> Vec<String> {
self.options()
.table_partition_cols
.iter()
.map(|(name, _data_type)| name.clone())
.collect_vec()
}

fn file_ext(&self) -> String {
self.options().file_extension.clone()
}
}

/// Register a [`ListingTableLike`] implementation in this registry.
/// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`]
/// into a [`ListingTableLike`] where possible.
pub fn register_listing_table<T: ListingTableLike>() {
TABLE_TYPE_REGISTRY.register_listing_table::<T>();
}

/// Attempt to cast the given TableProvider into a [`ListingTableLike`].
/// If the table's type has not been registered using [`register_listing_table`], will return `None`.
pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
TABLE_TYPE_REGISTRY.cast_to_listing_table(table)
}

/// A hive-partitioned table in object storage that is defined by a user-provided query.
pub trait Materialized: ListingTableLike {
/// The query that defines this materialized view.
fn query(&self) -> LogicalPlan;
}

type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;

/// A registry for implementations of [`ListingTableLike`], used for downcasting
/// arbitrary TableProviders into `dyn ListingTableLike` where possible.
///
/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike`.
/// By default, [`ListingTable`] is registered.
struct TableTypeRegistry {
listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
}

impl Debug for TableTypeRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TableTypeRegistry")
.field(
"listing_table_accessors",
&self
.listing_table_accessors
.iter()
.map(|r| r.value().0)
.collect_vec(),
)
.finish()
}
}

impl Default for TableTypeRegistry {
fn default() -> Self {
let new = Self {
listing_table_accessors: DashMap::new(),
};
new.register_listing_table::<ListingTable>();

new
}
}

impl TableTypeRegistry {
fn register_listing_table<T: ListingTableLike>(&self) {
self.listing_table_accessors.insert(
TypeId::of::<T>(),
(
type_name::<T>(),
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn ListingTableLike)),
),
);
}

fn cast_to_listing_table<'a>(
&'a self,
table: &'a dyn TableProvider,
) -> Option<&'a dyn ListingTableLike> {
self.listing_table_accessors
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}
}
16 changes: 16 additions & 0 deletions src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
Loading

0 comments on commit b7974e8

Please sign in to comment.