From 8a895d5aa572bfda39be5234b47281ebc954f8b6 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Thu, 30 Jan 2025 22:42:43 +0900 Subject: [PATCH 01/12] Fix: Avoid recursive external error wrapping in type conversion. --- datafusion/common/src/error.rs | 38 ++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index f7c247aaf288..e1d23bc2b1a6 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -293,7 +293,14 @@ impl From for DataFusionError { impl From for DataFusionError { fn from(err: GenericError) -> Self { - DataFusionError::External(err) + // If the error is already a DataFusionError, not wrapping it. + if err.is::() { + if let Ok(e) = err.downcast::() { + *e + } else { unreachable!() } + } else { + DataFusionError::External(err) + } } } @@ -656,7 +663,7 @@ pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionE mod test { use std::sync::Arc; - use crate::error::DataFusionError; + use crate::error::{DataFusionError, GenericError}; use arrow::error::ArrowError; #[test] @@ -810,6 +817,33 @@ mod test { ); } + #[test] + fn external_error() { + // assert not wrapping DataFusionError + let generic_error: GenericError = Box::new(DataFusionError::Plan("test".to_string())); + let datafusion_error: DataFusionError = generic_error.into(); + println!("{}", datafusion_error.strip_backtrace()); + assert_eq!(datafusion_error.strip_backtrace(), "Error during planning: test"); + + // assert wrapping other Error + let generic_error: GenericError = Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error")); + let datafusion_error: DataFusionError = generic_error.into(); + println!("{}", datafusion_error.strip_backtrace()); + assert_eq!(datafusion_error.strip_backtrace(), "External error: io error"); + } + + #[test] + fn external_error_no_recursive() { + let generic_error_1: GenericError = Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error")); + let external_error_1 : DataFusionError = generic_error_1.into(); + let generic_error_2 : GenericError = Box::new(external_error_1); + let external_error_2: DataFusionError = generic_error_2.into(); + + println!("{}", external_error_2); + assert!(external_error_2.to_string().starts_with("External error: io error")); + } + + /// Model what happens when implementing SendableRecordBatchStream: /// DataFusion code needs to return an ArrowError fn return_arrow_error() -> arrow::error::Result<()> { From dc9dd5e019768d2a0d7ce6b9ecc31534736ea737 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Thu, 30 Jan 2025 22:46:21 +0900 Subject: [PATCH 02/12] cargo fmt --- datafusion/common/src/error.rs | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index e1d23bc2b1a6..ccfd079bc780 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -297,7 +297,9 @@ impl From for DataFusionError { if err.is::() { if let Ok(e) = err.downcast::() { *e - } else { unreachable!() } + } else { + unreachable!() + } } else { DataFusionError::External(err) } @@ -820,30 +822,40 @@ mod test { #[test] fn external_error() { // assert not wrapping DataFusionError - let generic_error: GenericError = Box::new(DataFusionError::Plan("test".to_string())); + let generic_error: GenericError = + Box::new(DataFusionError::Plan("test".to_string())); let datafusion_error: DataFusionError = generic_error.into(); println!("{}", datafusion_error.strip_backtrace()); - assert_eq!(datafusion_error.strip_backtrace(), "Error during planning: test"); + assert_eq!( + datafusion_error.strip_backtrace(), + "Error during planning: test" + ); // assert wrapping other Error - let generic_error: GenericError = Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error")); + let generic_error: GenericError = + Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error")); let datafusion_error: DataFusionError = generic_error.into(); println!("{}", datafusion_error.strip_backtrace()); - assert_eq!(datafusion_error.strip_backtrace(), "External error: io error"); + assert_eq!( + datafusion_error.strip_backtrace(), + "External error: io error" + ); } #[test] fn external_error_no_recursive() { - let generic_error_1: GenericError = Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error")); - let external_error_1 : DataFusionError = generic_error_1.into(); - let generic_error_2 : GenericError = Box::new(external_error_1); + let generic_error_1: GenericError = + Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error")); + let external_error_1: DataFusionError = generic_error_1.into(); + let generic_error_2: GenericError = Box::new(external_error_1); let external_error_2: DataFusionError = generic_error_2.into(); println!("{}", external_error_2); - assert!(external_error_2.to_string().starts_with("External error: io error")); + assert!(external_error_2 + .to_string() + .starts_with("External error: io error")); } - /// Model what happens when implementing SendableRecordBatchStream: /// DataFusion code needs to return an ArrowError fn return_arrow_error() -> arrow::error::Result<()> { From 127705e2e05523b66ba8416eeac3226217e39f05 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Fri, 31 Jan 2025 19:13:51 +0900 Subject: [PATCH 03/12] test TableProvider --- datafusion/core/src/test_util/mod.rs | 77 +++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 50e33b27e1bb..475f4a8cd52f 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -24,6 +24,7 @@ pub mod csv; use std::any::Any; use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::Write; use std::path::Path; @@ -43,7 +44,7 @@ use crate::prelude::{CsvReadOptions, SessionContext}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_catalog::Session; -use datafusion_common::TableReference; +use datafusion_common::{DataFusionError, TableReference}; use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType}; use async_trait::async_trait; @@ -53,6 +54,8 @@ use tempfile::TempDir; #[cfg(feature = "parquet")] pub use datafusion_common::test_util::parquet_test_data; pub use datafusion_common::test_util::{arrow_test_data, get_data_dir}; +use datafusion_execution::TaskContext; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties}; /// Scan an empty data source, mainly used in tests pub fn scan_empty( @@ -217,7 +220,7 @@ impl TableProvider for TestTableProvider { _filters: &[Expr], _limit: Option, ) -> Result> { - unimplemented!("TestTableProvider is a stub for testing.") + Ok(Arc::new(TestTableExec {})) } } @@ -272,3 +275,73 @@ pub fn bounded_stream(batch: RecordBatch, limit: usize) -> SendableRecordBatchSt batch, }) } + +#[derive(Debug)] +struct TestTableExec {} + +impl DisplayAs for TestTableExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "TestTableExec") + } +} + +impl ExecutionPlan for TestTableExec { + fn name(&self) -> &str { + "TestTableExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + unimplemented!() + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + Err(DataFusionError::NotImplemented("for test".to_string())) + } +} + +#[cfg(test)] +mod tests { + use crate::prelude::SessionContext; + use crate::test_util::TestTableProvider; + use datafusion_catalog::TableProvider; + use datafusion_execution::TaskContext; + use std::sync::Arc; + + #[tokio::test] + async fn table_provider_scan_err() -> datafusion_common::Result<()> { + let provider = TestTableProvider { + url: "test".to_string(), + schema: Arc::new(arrow_schema::Schema::empty()), + }; + let exec = provider + .scan(&SessionContext::new().state(), None, &[], None) + .await?; + let result = exec.execute(1, Arc::new(TaskContext::default())); + let error = result.err(); + assert_eq!( + error.unwrap().strip_backtrace(), + "This feature is not implemented: for test" + ); + + Ok(()) + } +} From 3723ec50504c481b3708b757f16973a8ab61a532 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Sat, 1 Feb 2025 00:09:02 +0900 Subject: [PATCH 04/12] Revert "test TableProvider" This reverts commit 127705e2e05523b66ba8416eeac3226217e39f05. --- datafusion/core/src/test_util/mod.rs | 77 +--------------------------- 1 file changed, 2 insertions(+), 75 deletions(-) diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 475f4a8cd52f..50e33b27e1bb 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -24,7 +24,6 @@ pub mod csv; use std::any::Any; use std::collections::HashMap; -use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::Write; use std::path::Path; @@ -44,7 +43,7 @@ use crate::prelude::{CsvReadOptions, SessionContext}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_catalog::Session; -use datafusion_common::{DataFusionError, TableReference}; +use datafusion_common::TableReference; use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType}; use async_trait::async_trait; @@ -54,8 +53,6 @@ use tempfile::TempDir; #[cfg(feature = "parquet")] pub use datafusion_common::test_util::parquet_test_data; pub use datafusion_common::test_util::{arrow_test_data, get_data_dir}; -use datafusion_execution::TaskContext; -use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties}; /// Scan an empty data source, mainly used in tests pub fn scan_empty( @@ -220,7 +217,7 @@ impl TableProvider for TestTableProvider { _filters: &[Expr], _limit: Option, ) -> Result> { - Ok(Arc::new(TestTableExec {})) + unimplemented!("TestTableProvider is a stub for testing.") } } @@ -275,73 +272,3 @@ pub fn bounded_stream(batch: RecordBatch, limit: usize) -> SendableRecordBatchSt batch, }) } - -#[derive(Debug)] -struct TestTableExec {} - -impl DisplayAs for TestTableExec { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "TestTableExec") - } -} - -impl ExecutionPlan for TestTableExec { - fn name(&self) -> &str { - "TestTableExec" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - unimplemented!() - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - Err(DataFusionError::NotImplemented("for test".to_string())) - } -} - -#[cfg(test)] -mod tests { - use crate::prelude::SessionContext; - use crate::test_util::TestTableProvider; - use datafusion_catalog::TableProvider; - use datafusion_execution::TaskContext; - use std::sync::Arc; - - #[tokio::test] - async fn table_provider_scan_err() -> datafusion_common::Result<()> { - let provider = TestTableProvider { - url: "test".to_string(), - schema: Arc::new(arrow_schema::Schema::empty()), - }; - let exec = provider - .scan(&SessionContext::new().state(), None, &[], None) - .await?; - let result = exec.execute(1, Arc::new(TaskContext::default())); - let error = result.err(); - assert_eq!( - error.unwrap().strip_backtrace(), - "This feature is not implemented: for test" - ); - - Ok(()) - } -} From 5d480a4b5a1b451b8331c460d9784897031cead1 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Sun, 2 Feb 2025 00:23:57 +0900 Subject: [PATCH 05/12] create WrappedError --- datafusion/common/src/error.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index ccfd079bc780..2667720304c8 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -131,6 +131,10 @@ pub enum DataFusionError { /// Errors from either mapping LogicalPlans to/from Substrait plans /// or serializing/deserializing protobytes to Substrait plans Substrait(String), + + /// Errors for wrapping other errors. + /// This is useful when we need to share DatafusionError. + WrappedError(Arc), } #[macro_export] @@ -337,6 +341,7 @@ impl Error for DataFusionError { DataFusionError::External(e) => Some(e.as_ref()), DataFusionError::Context(_, e) => Some(e.as_ref()), DataFusionError::Substrait(_) => None, + DataFusionError::WrappedError(e) => Some(e.as_ref()), } } } @@ -450,6 +455,7 @@ impl DataFusionError { DataFusionError::External(_) => "External error: ", DataFusionError::Context(_, _) => "", DataFusionError::Substrait(_) => "Substrait error: ", + DataFusionError::WrappedError(_) => "", } } @@ -490,6 +496,7 @@ impl DataFusionError { Cow::Owned(format!("{desc}\ncaused by\n{}", *err)) } DataFusionError::Substrait(ref desc) => Cow::Owned(desc.to_string()), + DataFusionError::WrappedError(ref desc) => Cow::Owned(desc.to_string()), } } } From 05293c538418d1a7a1a7d8789a569e93088af1c3 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Sun, 2 Feb 2025 00:25:28 +0900 Subject: [PATCH 06/12] Replace `ExternalError` with `WrappedError` in `RepartitionExec` --- datafusion/physical-plan/src/repartition/mod.rs | 2 +- datafusion/sqllogictest/test_files/errors.slt | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 8d180c212eba..dc15ed44393e 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -916,7 +916,7 @@ impl RepartitionExec { for (_, tx) in txs { // wrap it because need to send error to all output partitions - let err = Err(DataFusionError::External(Box::new(Arc::clone(&e)))); + let err = Err(DataFusionError::WrappedError(Arc::clone(&e))); tx.send(Some(err)).await.ok(); } } diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index a153a2e9cecf..609707ac9001 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -161,3 +161,12 @@ create table records (timestamp timestamp, value float) as values ( '2021-01-01 00:00:00', 1.0, '2021-01-01 00:00:00', 2.0 ); + +statement ok +CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER); + +statement ok +INSERT INTO tab0 VALUES(83,0,38); + +query error DataFusion error: Arrow error: Divide by zero error +SELECT DISTINCT - 84 FROM tab0 AS cor0 WHERE NOT + 96 / + col1 <= NULL GROUP BY col1, col0; \ No newline at end of file From 4dc64521710274e5ea6aa8a1820e5608577b83f7 Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Sun, 2 Feb 2025 00:37:35 +0900 Subject: [PATCH 07/12] Replace `ExternalError` with `WrappedError` in Join Exec --- datafusion/physical-plan/src/joins/cross_join.rs | 2 +- datafusion/physical-plan/src/joins/hash_join.rs | 4 ++-- datafusion/physical-plan/src/joins/nested_loop_join.rs | 2 +- datafusion/physical-plan/src/joins/utils.rs | 9 ++++----- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index ab94c132a209..cec717a25cf0 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -867,7 +867,7 @@ mod tests { assert_contains!( err.to_string(), - "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec" + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec" ); Ok(()) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index bac72e8a0cc7..9a3882fff029 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -4014,7 +4014,7 @@ mod tests { // Asserting that operator-level reservation attempting to overallocate assert_contains!( err.to_string(), - "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput" + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput" ); assert_contains!( @@ -4099,7 +4099,7 @@ mod tests { // Asserting that stream-level reservation attempting to overallocate assert_contains!( err.to_string(), - "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]" + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]" ); diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index ce960df32ec2..cac44ec61585 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1513,7 +1513,7 @@ pub(crate) mod tests { assert_contains!( err.to_string(), - "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]" + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]" ); } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 5327793d01e2..e31def2eb2b4 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1076,7 +1076,7 @@ impl OnceFut { OnceFutState::Ready(r) => Poll::Ready( r.as_ref() .map(|r| r.as_ref()) - .map_err(|e| DataFusionError::External(Box::new(Arc::clone(e)))), + .map_err(|e| DataFusionError::WrappedError(Arc::clone(e))), ), } } @@ -1090,10 +1090,9 @@ impl OnceFut { match &self.state { OnceFutState::Pending(_) => unreachable!(), - OnceFutState::Ready(r) => Poll::Ready( - r.clone() - .map_err(|e| DataFusionError::External(Box::new(e))), - ), + OnceFutState::Ready(r) => { + Poll::Ready(r.clone().map_err(DataFusionError::WrappedError)) + } } } } From f3f5b149d22fca6d8049c70b3cd3bf1ac79326ac Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Sun, 2 Feb 2025 00:39:31 +0900 Subject: [PATCH 08/12] sqllogictest --- datafusion/sqllogictest/test_files/aggregate.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index bd3b40089519..6bb04c9b1009 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -86,7 +86,7 @@ statement error DataFusion error: Error during planning: Failed to coerce argume SELECT approx_percentile_cont_with_weight(c3, c2, c1) FROM aggregate_test_100 # csv_query_approx_percentile_cont_with_histogram_bins -statement error DataFusion error: External error: This feature is not implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data type Int64\)\. +statement error DataFusion error: This feature is not implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data type Int64\)\. SELECT c1, approx_percentile_cont(c3, 0.95, -1000) AS c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 statement error DataFusion error: Error during planning: Failed to coerce arguments to satisfy a call to approx_percentile_cont function: coercion from \[Int16, Float64, Utf8\] to the signature OneOf(.*) failed(.|\n)* From d4e860f0e9e5bfb4111a274450d622fb5457551a Mon Sep 17 00:00:00 2001 From: Namgung Chan <9511chn@gmail.com> Date: Mon, 3 Feb 2025 09:46:05 +0900 Subject: [PATCH 09/12] rename SharedError --- datafusion/common/src/error.rs | 8 ++++---- datafusion/physical-plan/src/joins/utils.rs | 4 ++-- datafusion/physical-plan/src/repartition/mod.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 0b68cc38fe77..524791e9149e 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -139,7 +139,7 @@ pub enum DataFusionError { /// Errors for wrapping other errors. /// This is useful when we need to share DatafusionError. - WrappedError(Arc), + SharedError(Arc), } #[macro_export] @@ -347,7 +347,7 @@ impl Error for DataFusionError { DataFusionError::Context(_, e) => Some(e.as_ref()), DataFusionError::Substrait(_) => None, DataFusionError::Diagnostic(_, e) => Some(e.as_ref()), - DataFusionError::WrappedError(e) => Some(e.as_ref()), + DataFusionError::SharedError(e) => Some(e.as_ref()), } } } @@ -462,7 +462,7 @@ impl DataFusionError { DataFusionError::Context(_, _) => "", DataFusionError::Substrait(_) => "Substrait error: ", DataFusionError::Diagnostic(_, _) => "", - DataFusionError::WrappedError(_) => "", + DataFusionError::SharedError(_) => "", } } @@ -504,7 +504,7 @@ impl DataFusionError { } DataFusionError::Substrait(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::Diagnostic(_, ref err) => Cow::Owned(err.to_string()), - DataFusionError::WrappedError(ref desc) => Cow::Owned(desc.to_string()), + DataFusionError::SharedError(ref desc) => Cow::Owned(desc.to_string()), } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e31def2eb2b4..ddfc6349d82f 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1076,7 +1076,7 @@ impl OnceFut { OnceFutState::Ready(r) => Poll::Ready( r.as_ref() .map(|r| r.as_ref()) - .map_err(|e| DataFusionError::WrappedError(Arc::clone(e))), + .map_err(|e| DataFusionError::SharedError(Arc::clone(e))), ), } } @@ -1091,7 +1091,7 @@ impl OnceFut { match &self.state { OnceFutState::Pending(_) => unreachable!(), OnceFutState::Ready(r) => { - Poll::Ready(r.clone().map_err(DataFusionError::WrappedError)) + Poll::Ready(r.clone().map_err(DataFusionError::SharedError)) } } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index dc15ed44393e..79bc4b1bdc17 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -916,7 +916,7 @@ impl RepartitionExec { for (_, tx) in txs { // wrap it because need to send error to all output partitions - let err = Err(DataFusionError::WrappedError(Arc::clone(&e))); + let err = Err(DataFusionError::SharedError(Arc::clone(&e))); tx.send(Some(err)).await.ok(); } } From 63de9b7cb786b15da2b0c655e0d018642f932eea Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Feb 2025 06:29:20 -0500 Subject: [PATCH 10/12] Update comments and rename to DataFusionError::Shared --- datafusion/common/src/error.rs | 16 +++++++++------- datafusion/physical-plan/src/joins/utils.rs | 4 ++-- datafusion/physical-plan/src/repartition/mod.rs | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 524791e9149e..c3df5ed9d92f 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -136,10 +136,12 @@ pub enum DataFusionError { /// human-readable messages, and locations in the source query that relate /// to the error in some way. Diagnostic(Box, Box), - - /// Errors for wrapping other errors. - /// This is useful when we need to share DatafusionError. - SharedError(Arc), + /// Error that wrapping other [`DataFusionError`]s + /// + /// This is useful when the same underlying [`DataFusionError`] is passed + /// to multiple receivers. For example, when the source of a repartition + /// errors and the error is propagated to multiple consumers. + Shared(Arc), } #[macro_export] @@ -347,7 +349,7 @@ impl Error for DataFusionError { DataFusionError::Context(_, e) => Some(e.as_ref()), DataFusionError::Substrait(_) => None, DataFusionError::Diagnostic(_, e) => Some(e.as_ref()), - DataFusionError::SharedError(e) => Some(e.as_ref()), + DataFusionError::Shared(e) => Some(e.as_ref()), } } } @@ -462,7 +464,7 @@ impl DataFusionError { DataFusionError::Context(_, _) => "", DataFusionError::Substrait(_) => "Substrait error: ", DataFusionError::Diagnostic(_, _) => "", - DataFusionError::SharedError(_) => "", + DataFusionError::Shared(_) => "", } } @@ -504,7 +506,7 @@ impl DataFusionError { } DataFusionError::Substrait(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::Diagnostic(_, ref err) => Cow::Owned(err.to_string()), - DataFusionError::SharedError(ref desc) => Cow::Owned(desc.to_string()), + DataFusionError::Shared(ref desc) => Cow::Owned(desc.to_string()), } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index ddfc6349d82f..939f508758b2 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1076,7 +1076,7 @@ impl OnceFut { OnceFutState::Ready(r) => Poll::Ready( r.as_ref() .map(|r| r.as_ref()) - .map_err(|e| DataFusionError::SharedError(Arc::clone(e))), + .map_err(|e| DataFusionError::Shared(Arc::clone(e))), ), } } @@ -1091,7 +1091,7 @@ impl OnceFut { match &self.state { OnceFutState::Pending(_) => unreachable!(), OnceFutState::Ready(r) => { - Poll::Ready(r.clone().map_err(DataFusionError::SharedError)) + Poll::Ready(r.clone().map_err(DataFusionError::Shared)) } } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 79bc4b1bdc17..be05ecc86124 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -916,7 +916,7 @@ impl RepartitionExec { for (_, tx) in txs { // wrap it because need to send error to all output partitions - let err = Err(DataFusionError::SharedError(Arc::clone(&e))); + let err = Err(DataFusionError::Shared(Arc::clone(&e))); tx.send(Some(err)).await.ok(); } } From b671378b337a8b364c101de031461ff6c51c397f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Feb 2025 06:47:41 -0500 Subject: [PATCH 11/12] Improve API --- datafusion/common/src/error.rs | 13 ++++++++++++- datafusion/physical-plan/src/joins/utils.rs | 2 +- datafusion/physical-plan/src/repartition/mod.rs | 3 ++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index c3df5ed9d92f..af3a774e06bd 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -136,7 +136,7 @@ pub enum DataFusionError { /// human-readable messages, and locations in the source query that relate /// to the error in some way. Diagnostic(Box, Box), - /// Error that wrapping other [`DataFusionError`]s + /// A [`DataFusionError`] which shares an underlying [`DataFusionError`]. /// /// This is useful when the same underlying [`DataFusionError`] is passed /// to multiple receivers. For example, when the source of a repartition @@ -268,6 +268,17 @@ impl From for ArrowError { } } +impl From<&Arc> for DataFusionError { + fn from(e: &Arc) -> Self { + if let DataFusionError::Shared(e_inner) = e.as_ref() { + // don't re-wrap + DataFusionError::Shared(Arc::clone(e_inner)) + } else { + DataFusionError::Shared(Arc::clone(e)) + } + } +} + #[cfg(feature = "parquet")] impl From for DataFusionError { fn from(e: ParquetError) -> Self { diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 939f508758b2..a157c41f9a41 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1076,7 +1076,7 @@ impl OnceFut { OnceFutState::Ready(r) => Poll::Ready( r.as_ref() .map(|r| r.as_ref()) - .map_err(|e| DataFusionError::Shared(Arc::clone(e))), + .map_err(|e| DataFusionError::from(e)), ), } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 327f73550a55..b05c7d5a67d0 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -911,11 +911,12 @@ impl RepartitionExec { } // Error from running input task Ok(Err(e)) => { + // send the same Arc'd error to all output partitions let e = Arc::new(e); for (_, tx) in txs { // wrap it because need to send error to all output partitions - let err = Err(DataFusionError::Shared(Arc::clone(&e))); + let err = Err(DataFusionError::from(&e)); tx.send(Some(err)).await.ok(); } } From e390b97de40956334a89cc352be4f132b324ce82 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Feb 2025 07:34:13 -0500 Subject: [PATCH 12/12] fix clippy --- datafusion/physical-plan/src/joins/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a157c41f9a41..6fdb54a7d98e 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1076,7 +1076,7 @@ impl OnceFut { OnceFutState::Ready(r) => Poll::Ready( r.as_ref() .map(|r| r.as_ref()) - .map_err(|e| DataFusionError::from(e)), + .map_err(DataFusionError::from), ), } }