Skip to content

Commit

Permalink
Fix 2 bugs related to push down partition filters (#12902)
Browse files Browse the repository at this point in the history
* Report errors in partition filters

This patch fixes 2 bugs. Errors in partition filters are ignored and
that we allow partitions filters be push down for unpartition tables
but we never evaluate such filters.

The first bug is fixed by reporting errors for partition filters and
only evaluating the filters we allowed as partition filters in
`supports_filters_pushdown`.

The second bug is fixed by only allowing partition filters to be pushed
down when we have partition columns.

* Update datafusion/sqllogictest/test_files/errors.slt
  • Loading branch information
eejbyfeldt authored Oct 17, 2024
1 parent 1ba1e53 commit b098893
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 53 deletions.
4 changes: 1 addition & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2987,9 +2987,7 @@ mod tests {
JoinType::Inner,
Some(Expr::Literal(ScalarValue::Null)),
)?;
let expected_plan = "CrossJoin:\
\n TableScan: a projection=[c1], full_filters=[Boolean(NULL)]\
\n TableScan: b projection=[c1]";
let expected_plan = "EmptyRelation";
assert_eq!(expected_plan, format!("{}", join.into_optimized_plan()?));

// JOIN ON expression must be boolean type
Expand Down
36 changes: 18 additions & 18 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;
use super::ListingTableUrl;
use super::PartitionedFile;
use crate::execution::context::SessionState;
use datafusion_common::internal_err;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{BinaryExpr, Operator};

Expand Down Expand Up @@ -285,25 +286,20 @@ async fn prune_partitions(
let props = ExecutionProps::new();

// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Option<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &props).ok()?;
expr.evaluate(&batch)
.ok()?
.into_array(partitions.len())
.ok()
let do_filter = |filter| -> Result<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &props)?;
expr.evaluate(&batch)?.into_array(partitions.len())
};

//.Compute the conjunction of the filters, ignoring errors
//.Compute the conjunction of the filters
let mask = filters
.iter()
.fold(None, |acc, filter| match (acc, do_filter(filter)) {
(Some(a), Some(b)) => Some(and(&a, b.as_boolean()).unwrap_or(a)),
(None, Some(r)) => Some(r.as_boolean().clone()),
(r, None) => r,
});
.map(|f| do_filter(f).map(|a| a.as_boolean().clone()))
.reduce(|a, b| Ok(and(&a?, &b?)?));

let mask = match mask {
Some(mask) => mask,
Some(Ok(mask)) => mask,
Some(Err(err)) => return Err(err),
None => return Ok(partitions),
};

Expand Down Expand Up @@ -401,8 +397,8 @@ fn evaluate_partition_prefix<'a>(

/// Discover the partitions on the given path and prune out files
/// that belong to irrelevant partitions using `filters` expressions.
/// `filters` might contain expressions that can be resolved only at the
/// file level (e.g. Parquet row group pruning).
/// `filters` should only contain expressions that can be evaluated
/// using only the partition columns.
pub async fn pruned_partition_list<'a>(
ctx: &'a SessionState,
store: &'a dyn ObjectStore,
Expand All @@ -413,6 +409,12 @@ pub async fn pruned_partition_list<'a>(
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
// if no partition col => simply list all the files
if partition_cols.is_empty() {
if !filters.is_empty() {
return internal_err!(
"Got partition filters for unpartitioned table {}",
table_path
);
}
return Ok(Box::pin(
table_path
.list_all_files(ctx, store, file_extension)
Expand Down Expand Up @@ -631,13 +633,11 @@ mod tests {
]);
let filter1 = Expr::eq(col("part1"), lit("p1v2"));
let filter2 = Expr::eq(col("part2"), lit("p2v1"));
// filter3 cannot be resolved at partition pruning
let filter3 = Expr::eq(col("part2"), col("other"));
let pruned = pruned_partition_list(
&state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2, filter3],
&[filter1, filter2],
".parquet",
&[
(String::from("part1"), DataType::Utf8),
Expand Down
69 changes: 37 additions & 32 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,16 @@ impl ListingTable {
}
}

// Expressions can be used for parttion pruning if they can be evaluated using
// only the partiton columns and there are partition columns.
fn can_be_evaluted_for_partition_pruning(
partition_column_names: &[&str],
expr: &Expr,
) -> bool {
!partition_column_names.is_empty()
&& expr_applicable_for_cols(partition_column_names, expr)
}

#[async_trait]
impl TableProvider for ListingTable {
fn as_any(&self) -> &dyn Any {
Expand All @@ -807,10 +817,28 @@ impl TableProvider for ListingTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// extract types of partition columns
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

let table_partition_col_names = table_partition_cols
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
// If the filters can be resolved using only partition cols, there is no need to
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
let (partition_filters, filters): (Vec<_>, Vec<_>) =
filters.iter().cloned().partition(|filter| {
can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter)
});
// TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
let (mut partitioned_file_lists, statistics) = self
.list_files_for_scan(session_state, filters, limit)
.list_files_for_scan(session_state, &partition_filters, limit)
.await?;

// if no files need to be read, return an `EmptyExec`
Expand Down Expand Up @@ -846,28 +874,6 @@ impl TableProvider for ListingTable {
None => {} // no ordering required
};

// extract types of partition columns
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;

// If the filters can be resolved using only partition cols, there is no need to
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
let table_partition_col_names = table_partition_cols
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();
let filters = filters
.iter()
.filter(|filter| {
!expr_applicable_for_cols(&table_partition_col_names, filter)
})
.cloned()
.collect::<Vec<_>>();

let filters = conjunction(filters.to_vec())
.map(|expr| -> Result<_> {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
Expand Down Expand Up @@ -908,18 +914,17 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
let partition_column_names = self
.options
.table_partition_cols
.iter()
.map(|col| col.0.as_str())
.collect::<Vec<_>>();
filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|col| col.0.as_str())
.collect::<Vec<_>>(),
filter,
) {
if can_be_evaluted_for_partition_pruning(&partition_column_names, filter)
{
// if filter can be handled by partition pruning, it is exact
return Ok(TableProviderFilterPushDown::Exact);
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/sqllogictest/test_files/arrow_files.slt
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,8 @@ EXPLAIN SELECT f0 FROM arrow_partitioned WHERE part = 456
----
logical_plan TableScan: arrow_partitioned projection=[f0], full_filters=[arrow_partitioned.part = Int32(456)]
physical_plan ArrowExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_arrow/part=456/data.arrow]]}, projection=[f0]


# Errors in partition filters should be reported
query error Divide by zero error
SELECT f0 FROM arrow_partitioned WHERE CASE WHEN true THEN 1 / 0 ELSE part END = 1;
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/test_files/errors.slt
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,7 @@ create table foo as values (1), ('foo');

query error No function matches
select 1 group by substr('');

# Error in filter should be reported
query error Divide by zero
SELECT c2 from aggregate_test_100 where CASE WHEN true THEN 1 / 0 ELSE 0 END = 1;

0 comments on commit b098893

Please sign in to comment.