Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interface for physical plan invariant checking. #13986

Merged
merged 12 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 328 additions & 13 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow_array::builder::StringBuilder;
use arrow_array::RecordBatch;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
ScalarValue,
Expand All @@ -82,6 +83,7 @@ use datafusion_expr::{
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;
Expand Down Expand Up @@ -1874,33 +1876,35 @@ impl DefaultPhysicalPlanner {
displayable(plan.as_ref()).indent(true)
);

let mut new_plan = plan;
// This runs once before any optimization,
// to verify that the plan fulfills the base requirements.
InvariantChecker(InvariantLevel::Always).check(&plan)?;

let mut new_plan = Arc::clone(&plan);
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer
.optimize(new_plan, session_state.config_options())
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;
if optimizer.schema_check() && new_plan.schema() != before_schema {
let e = DataFusionError::Internal(format!(
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
optimizer.name(),
before_schema,
new_plan.schema()
));
return Err(DataFusionError::Context(
optimizer.name().to_string(),
Box::new(e),
));
}

// This only checks the schema in release build, and performs additional checks in debug mode.
OptimizationInvariantChecker::new(optimizer)
.check(&new_plan, before_schema)?;

trace!(
"Optimized physical plan by {}:\n{}\n",
optimizer.name(),
displayable(new_plan.as_ref()).indent(false)
);
observer(new_plan.as_ref(), optimizer.as_ref())
}

// This runs once after all optimizer runs are complete,
// to verify that the plan is executable.
InvariantChecker(InvariantLevel::Executable).check(&new_plan)?;

debug!(
"Optimized physical plan:\n{}\n",
displayable(new_plan.as_ref()).indent(false)
Expand Down Expand Up @@ -2008,6 +2012,81 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
}
}

struct OptimizationInvariantChecker<'a> {
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
}

impl<'a> OptimizationInvariantChecker<'a> {
/// Create an [`OptimizationInvariantChecker`] that performs checking per tule.
pub fn new(rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>) -> Self {
Self { rule }
}

/// Checks that the plan change is permitted, returning an Error if not.
///
/// Conditionally performs schema checks per [PhysicalOptimizerRule::schema_check].
/// In debug mode, this recursively walks the entire physical plan
/// and performs [`ExecutionPlan::check_invariants`].
pub fn check(
&mut self,
plan: &Arc<dyn ExecutionPlan>,
previous_schema: Arc<Schema>,
) -> Result<()> {
// if the rule is not permitted to change the schema, confirm that it did not change.
if self.rule.schema_check() && plan.schema() != previous_schema {
internal_err!("PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
wiedld marked this conversation as resolved.
Show resolved Hide resolved
self.rule.name(),
previous_schema,
plan.schema()
)?
}

// check invariants per each ExecutionPlan node
#[cfg(debug_assertions)]
plan.visit(self)?;

Ok(())
}
}

impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
// Checks for the more permissive `InvariantLevel::Always`.
// Plans are not guarenteed to be executable after each physical optimizer run.
node.check_invariants(InvariantLevel::Always).map_err(|e| e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name())))?;
wiedld marked this conversation as resolved.
Show resolved Hide resolved
Ok(TreeNodeRecursion::Continue)
}
}

/// Check [`ExecutionPlan`] invariants per [`InvariantLevel`].
struct InvariantChecker(InvariantLevel);

impl InvariantChecker {
/// Checks that the plan is executable, returning an Error if not.
pub fn check(&mut self, plan: &Arc<dyn ExecutionPlan>) -> Result<()> {
// check invariants per each ExecutionPlan node
plan.visit(self)?;

Ok(())
}
}

impl<'n> TreeNodeVisitor<'n> for InvariantChecker {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
node.check_invariants(self.0).map_err(|e| {
e.context(format!(
"Invariant for ExecutionPlan node '{}' failed",
node.name()
))
})?;
Ok(TreeNodeRecursion::Continue)
}
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand All @@ -2028,6 +2107,7 @@ mod tests {
use crate::execution::session_state::SessionStateBuilder;
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, DFSchemaRef, TableReference};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -2782,4 +2862,239 @@ digraph {

assert_contains!(generated_graph, expected_tooltip);
}

/// Extension Node which passes invariant checks
#[derive(Debug)]
struct OkExtensionNode(Vec<Arc<dyn ExecutionPlan>>);
impl ExecutionPlan for OkExtensionNode {
fn name(&self) -> &str {
"always ok"
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self(children)))
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.0.iter().collect::<Vec<_>>()
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
impl DisplayAs for OkExtensionNode {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

/// Extension Node which fails the [`OptimizationInvariantChecker`].
#[derive(Debug)]
struct InvariantFailsExtensionNode;
impl ExecutionPlan for InvariantFailsExtensionNode {
fn name(&self) -> &str {
"InvariantFailsExtensionNode"
}
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
match check {
InvariantLevel::Always => plan_err!("extension node failed it's user-defined always-invariant check"),
InvariantLevel::Executable => panic!("the OptimizationInvariantChecker should not be checking for executableness"),
}
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
impl DisplayAs for InvariantFailsExtensionNode {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

/// Extension Optimizer rule that requires the schema check
#[derive(Debug)]
struct OptimizerRuleWithSchemaCheck;
impl PhysicalOptimizerRule for OptimizerRuleWithSchemaCheck {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(plan)
}
fn name(&self) -> &str {
"OptimizerRuleWithSchemaCheck"
}
fn schema_check(&self) -> bool {
true
}
}

#[test]
fn test_optimization_invariant_checker() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(OptimizerRuleWithSchemaCheck);

// ok plan
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
let child = Arc::clone(&ok_node);
let ok_plan = Arc::clone(&ok_node).with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?,
Arc::clone(&child),
])?;

// Test: check should pass with same schema
let equal_schema = ok_plan.schema();
OptimizationInvariantChecker::new(&rule).check(&ok_plan, equal_schema)?;

// Test: should fail with schema changed
let different_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let expected_err = OptimizationInvariantChecker::new(&rule)
.check(&ok_plan, different_schema)
.unwrap_err();
assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed, due to generate a different schema"));

// Test: should fail when extension node fails it's own invariant check
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let expected_err = OptimizationInvariantChecker::new(&rule)
.check(&failing_node, ok_plan.schema())
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined always-invariant check"));

// Test: should fail when descendent extension node fails
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
let invalid_plan = ok_node.with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
Arc::clone(&child),
])?;
let expected_err = OptimizationInvariantChecker::new(&rule)
.check(&invalid_plan, ok_plan.schema())
.unwrap_err();
assert!(expected_err
.to_string()
.contains("extension node failed it's user-defined always-invariant check"));

Ok(())
}

/// Extension Node which fails the [`InvariantChecker`]
/// if, and only if, [`InvariantLevel::Executable`]
#[derive(Debug)]
struct ExecutableInvariantFails;
impl ExecutionPlan for ExecutableInvariantFails {
fn name(&self) -> &str {
"ExecutableInvariantFails"
}
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
match check {
InvariantLevel::Always => Ok(()),
InvariantLevel::Executable => plan_err!(
"extension node failed it's user-defined executable-invariant check"
),
}
}
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn properties(&self) -> &PlanProperties {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
impl DisplayAs for ExecutableInvariantFails {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name())
}
}

#[test]
fn test_invariant_checker_levels() -> Result<()> {
// plan that passes the always-invariant, but fails the executable check
let plan: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);

// Test: check should pass with less stringent Always check
InvariantChecker(InvariantLevel::Always).check(&plan)?;

// Test: should fail the executable check
let expected_err = InvariantChecker(InvariantLevel::Executable)
.check(&plan)
.unwrap_err();
assert!(expected_err.to_string().contains(
"extension node failed it's user-defined executable-invariant check"
));

// Test: should fail when descendent extension node fails
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
let child = Arc::clone(&ok_node);
let plan = ok_node.with_new_children(vec![
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
Arc::clone(&child),
])?;
let expected_err = InvariantChecker(InvariantLevel::Executable)
.check(&plan)
.unwrap_err();
assert!(expected_err.to_string().contains(
"extension node failed it's user-defined executable-invariant check"
));

Ok(())
}
}
Loading
Loading