Skip to content

Commit

Permalink
Relax physical schema validation (#14519)
Browse files Browse the repository at this point in the history
Physical plan can be further optimized. In particular, an expression can
be determined as never null even if it wasn't known at the time of
logical planning. Thus, the final schema check needs to be relax,
allowing now-non-null data where nullable data was expected. This
replaces schema equality check, with asymmetric "is satisfied by"
relation.
  • Loading branch information
findepi authored Feb 7, 2025
1 parent 2aad9d2 commit 479a277
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 2 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ pub mod variable {
#[cfg(not(target_arch = "wasm32"))]
pub mod test;

mod schema_equivalence;
pub mod test_util;

#[cfg(doctest)]
Expand Down
8 changes: 6 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;

use crate::schema_equivalence::schema_satisfied_by;
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
Expand Down Expand Up @@ -659,7 +660,10 @@ impl DefaultPhysicalPlanner {
let physical_input_schema_from_logical = logical_input_schema.inner();

if !options.execution.skip_physical_aggregate_schema_check
&& &physical_input_schema != physical_input_schema_from_logical
&& !schema_satisfied_by(
physical_input_schema_from_logical,
&physical_input_schema,
)
{
let mut differences = Vec::new();
if physical_input_schema.fields().len()
Expand Down Expand Up @@ -688,7 +692,7 @@ impl DefaultPhysicalPlanner {
if physical_field.data_type() != logical_field.data_type() {
differences.push(format!("field data type at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.data_type(), logical_field.data_type()));
}
if physical_field.is_nullable() != logical_field.is_nullable() {
if physical_field.is_nullable() && !logical_field.is_nullable() {
differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable()));
}
}
Expand Down
84 changes: 84 additions & 0 deletions datafusion/core/src/schema_equivalence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_schema::{DataType, Field, Fields, Schema};

/// Verifies whether the original planned schema can be satisfied with data
/// adhering to the candidate schema. In practice, this is equality check on the
/// schemas except that original schema can have nullable fields where candidate
/// is constrained to not provide null data.
pub(crate) fn schema_satisfied_by(original: &Schema, candidate: &Schema) -> bool {
original.metadata() == candidate.metadata()
&& fields_satisfied_by(original.fields(), candidate.fields())
}

/// See [`schema_satisfied_by`] for the contract.
fn fields_satisfied_by(original: &Fields, candidate: &Fields) -> bool {
original.len() == candidate.len()
&& original
.iter()
.zip(candidate)
.all(|(original, candidate)| field_satisfied_by(original, candidate))
}

/// See [`schema_satisfied_by`] for the contract.
fn field_satisfied_by(original: &Field, candidate: &Field) -> bool {
original.name() == candidate.name()
&& (original.is_nullable() || !candidate.is_nullable())
&& original.metadata() == candidate.metadata()
&& data_type_satisfied_by(original.data_type(), candidate.data_type())
}

/// See [`schema_satisfied_by`] for the contract.
fn data_type_satisfied_by(original: &DataType, candidate: &DataType) -> bool {
match (original, candidate) {
(DataType::List(original_field), DataType::List(candidate_field)) => {
field_satisfied_by(original_field, candidate_field)
}

(DataType::ListView(original_field), DataType::ListView(candidate_field)) => {
field_satisfied_by(original_field, candidate_field)
}

(
DataType::FixedSizeList(original_field, original_size),
DataType::FixedSizeList(candidate_field, candidate_size),
) => {
original_size == candidate_size
&& field_satisfied_by(original_field, candidate_field)
}

(DataType::LargeList(original_field), DataType::LargeList(candidate_field)) => {
field_satisfied_by(original_field, candidate_field)
}

(
DataType::LargeListView(original_field),
DataType::LargeListView(candidate_field),
) => field_satisfied_by(original_field, candidate_field),

(DataType::Struct(original_fields), DataType::Struct(candidate_fields)) => {
fields_satisfied_by(original_fields, candidate_fields)
}

// TODO (DataType::Union(, _), DataType::Union(_, _)) => {}
// TODO (DataType::Dictionary(_, _), DataType::Dictionary(_, _)) => {}
// TODO (DataType::Map(_, _), DataType::Map(_, _)) => {}
// TODO (DataType::RunEndEncoded(_, _), DataType::RunEndEncoded(_, _)) => {}
_ => original == candidate,
}
}
10 changes: 10 additions & 0 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -850,3 +850,13 @@ FROM (
----
NULL false
foo true

query T
SELECT combined
FROM (
SELECT concat('A', 'B') AS combined UNION ALL
SELECT concat('A', 'B') AS combined
)
GROUP BY combined
----
AB

0 comments on commit 479a277

Please sign in to comment.