From 4895da39dc76ae9a58dc2092f115a5932d4b5042 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Fri, 24 Jan 2025 20:13:33 -0500 Subject: [PATCH 01/13] bug: Fix NULL handling in array_slice This commit fixes the array_slice function so that if any arguments are NULL, the result is NULL. Previously, array_slice would return an internal error if any of the arguments were NULL. This behavior matches the behavior of DuckDB for array_slice. Fixes #10548 --- datafusion/functions-nested/src/extract.rs | 15 ++++++++------- datafusion/sqllogictest/test_files/array.slt | 6 ++++++ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index cce10d2bf6db..c20058b64af7 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -27,7 +27,7 @@ use arrow::array::MutableArrayData; use arrow::array::OffsetSizeTrait; use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType; -use arrow_schema::DataType::{FixedSizeList, LargeList, List}; +use arrow_schema::DataType::{FixedSizeList, LargeList, List, Null}; use arrow_schema::Field; use datafusion_common::cast::as_int64_array; use datafusion_common::cast::as_large_list_array; @@ -419,16 +419,17 @@ fn array_slice_inner(args: &[ArrayRef]) -> Result { None }; - let from_array = as_int64_array(&args[1])?; - let to_array = as_int64_array(&args[2])?; - + let null_input = args.iter().find(|arg| arg.data_type().is_null()); let array_data_type = args[0].data_type(); - match array_data_type { - List(_) => { + match (array_data_type, null_input) { + (List(_) | LargeList(_) | Null, Some(null)) => Ok(Arc::clone(null)), + (List(_), None) => { let array = as_list_array(&args[0])?; + let from_array = as_int64_array(&args[1])?; + let to_array = as_int64_array(&args[2])?; general_array_slice::(array, from_array, to_array, stride) } - LargeList(_) => { + (LargeList(_), None) => { let array = as_large_list_array(&args[0])?; let from_array = as_int64_array(&args[1])?; let to_array = as_int64_array(&args[2])?; diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index baf4ef7795e7..8796365abb59 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -2036,6 +2036,12 @@ select array_slice(a, -1, 2, 1), array_slice(a, -1, 2), query error DataFusion error: Error during planning: array_slice does not support zero arguments select array_slice(); +# Testing with NULLs should result in a NULL +query B +select array_slice(NULL, NULL, NULL) IS NULL; +---- +true + ## array_any_value (aliases: list_any_value) # Testing with empty arguments should result in an error From 10bea2b06d72095c619056fc862783580db6c28d Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 25 Jan 2025 12:40:20 -0500 Subject: [PATCH 02/13] Fix optimizer error --- datafusion/functions-nested/src/extract.rs | 17 ++++++--- datafusion/sqllogictest/test_files/array.slt | 38 ++++++++++++-------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index c20058b64af7..383104b8b53d 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -419,17 +419,24 @@ fn array_slice_inner(args: &[ArrayRef]) -> Result { None }; - let null_input = args.iter().find(|arg| arg.data_type().is_null()); + let null_input = args.iter().any(|arg| arg.data_type().is_null()); let array_data_type = args[0].data_type(); - match (array_data_type, null_input) { - (List(_) | LargeList(_) | Null, Some(null)) => Ok(Arc::clone(null)), - (List(_), None) => { + match array_data_type { + Null => Ok(Arc::clone(&args[0])), + List(field_ref) if null_input => Ok(Arc::new(GenericListArray::::new_null( + field_ref.clone(), + 1, + ))), + LargeList(field_ref) if null_input => Ok(Arc::new( + GenericListArray::::new_null(field_ref.clone(), 1), + )), + List(_) => { let array = as_list_array(&args[0])?; let from_array = as_int64_array(&args[1])?; let to_array = as_int64_array(&args[2])?; general_array_slice::(array, from_array, to_array, stride) } - (LargeList(_), None) => { + LargeList(_) => { let array = as_large_list_array(&args[0])?; let from_array = as_int64_array(&args[1])?; let to_array = as_int64_array(&args[2])?; diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 8796365abb59..b37ef7ee1b5f 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -1817,18 +1817,26 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 0, [1, 2, 3, 4] [h, e, l] # array_slice scalar function #8 (with NULL and positive number) -query error +query ?? select array_slice(make_array(1, 2, 3, 4, 5), NULL, 4), array_slice(make_array('h', 'e', 'l', 'l', 'o'), NULL, 3); +---- +NULL NULL -query error +query ?? select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), NULL, 4), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeList(Utf8)'), NULL, 3); +---- +NULL NULL # array_slice scalar function #9 (with positive number and NULL) -query error +query ?? select array_slice(make_array(1, 2, 3, 4, 5), 2, NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), 3, NULL); +---- +NULL NULL -query error +query ?? select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 2, NULL), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeList(Utf8)'), 3, NULL); +---- +NULL NULL # array_slice scalar function #10 (with zero-zero) query ?? @@ -1861,18 +1869,26 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 0, [1, 2] [h, e, l] # array_slice scalar function #13 (with negative number and NULL) -query error +query ?? select array_slice(make_array(1, 2, 3, 4, 5), -2, NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), -3, NULL); +---- +NULL NULL -query error +query ?? select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), -2, NULL), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeList(Utf8)'), -3, NULL); +---- +NULL NULL # array_slice scalar function #14 (with NULL and negative number) -query error +query ?? select array_slice(make_array(1, 2, 3, 4, 5), NULL, -4), array_slice(make_array('h', 'e', 'l', 'l', 'o'), NULL, -3); +---- +NULL NULL -query error +query ?? select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), NULL, -4), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeList(Utf8)'), NULL, -3); +---- +NULL NULL # array_slice scalar function #15 (with negative indexes) query ?? @@ -2036,12 +2052,6 @@ select array_slice(a, -1, 2, 1), array_slice(a, -1, 2), query error DataFusion error: Error during planning: array_slice does not support zero arguments select array_slice(); -# Testing with NULLs should result in a NULL -query B -select array_slice(NULL, NULL, NULL) IS NULL; ----- -true - ## array_any_value (aliases: list_any_value) # Testing with empty arguments should result in an error From 8da10c3cfa43ba279d8e8ee73b465b3982b4cc5f Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 25 Jan 2025 13:17:11 -0500 Subject: [PATCH 03/13] Fix clippy --- datafusion/functions-nested/src/extract.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index 383104b8b53d..64703a982eab 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -424,11 +424,11 @@ fn array_slice_inner(args: &[ArrayRef]) -> Result { match array_data_type { Null => Ok(Arc::clone(&args[0])), List(field_ref) if null_input => Ok(Arc::new(GenericListArray::::new_null( - field_ref.clone(), + Arc::clone(field_ref), 1, ))), LargeList(field_ref) if null_input => Ok(Arc::new( - GenericListArray::::new_null(field_ref.clone(), 1), + GenericListArray::::new_null(Arc::clone(field_ref), 1), )), List(_) => { let array = as_list_array(&args[0])?; From 622bc72c81a30f02bf769910f4b189ea263b3fe2 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 25 Jan 2025 20:33:23 -0500 Subject: [PATCH 04/13] PoC for strict functions --- datafusion/expr-common/src/signature.rs | 26 ++++++++++ datafusion/functions-nested/src/extract.rs | 21 +++----- datafusion/functions-nested/src/flatten.rs | 1 + .../physical-expr/src/scalar_function.rs | 52 ++++++++++++++++++- datafusion/sqllogictest/test_files/array.slt | 9 ++-- 5 files changed, 90 insertions(+), 19 deletions(-) diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index 56f3029a4d7a..79a649300333 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -465,6 +465,8 @@ pub struct Signature { pub type_signature: TypeSignature, /// The volatility of the function. See [Volatility] for more information. pub volatility: Volatility, + /// When true, the function always returns null whenever any of its arguments are null. + pub strict: bool, } impl Signature { @@ -473,6 +475,7 @@ impl Signature { Signature { type_signature, volatility, + strict: false, } } /// An arbitrary number of arguments with the same type, from those listed in `common_types`. @@ -480,6 +483,7 @@ impl Signature { Self { type_signature: TypeSignature::Variadic(common_types), volatility, + strict: false, } } /// User-defined coercion rules for the function. @@ -487,6 +491,7 @@ impl Signature { Self { type_signature: TypeSignature::UserDefined, volatility, + strict: false, } } @@ -495,6 +500,7 @@ impl Signature { Self { type_signature: TypeSignature::Numeric(arg_count), volatility, + strict: false, } } @@ -503,6 +509,7 @@ impl Signature { Self { type_signature: TypeSignature::String(arg_count), volatility, + strict: false, } } @@ -511,6 +518,7 @@ impl Signature { Self { type_signature: TypeSignature::VariadicAny, volatility, + strict: false, } } /// A fixed number of arguments of the same type, from those listed in `valid_types`. @@ -522,6 +530,7 @@ impl Signature { Self { type_signature: TypeSignature::Uniform(arg_count, valid_types), volatility, + strict: false, } } /// Exactly matches the types in `exact_types`, in order. @@ -529,6 +538,7 @@ impl Signature { Signature { type_signature: TypeSignature::Exact(exact_types), volatility, + strict: false, } } /// Target coerce types in order @@ -539,6 +549,7 @@ impl Signature { Self { type_signature: TypeSignature::Coercible(target_types), volatility, + strict: false, } } @@ -547,6 +558,7 @@ impl Signature { Self { type_signature: TypeSignature::Comparable(arg_count), volatility, + strict: false, } } @@ -554,6 +566,7 @@ impl Signature { Signature { type_signature: TypeSignature::Nullary, volatility, + strict: false, } } @@ -562,6 +575,7 @@ impl Signature { Signature { type_signature: TypeSignature::Any(arg_count), volatility, + strict: false, } } /// Any one of a list of [TypeSignature]s. @@ -569,6 +583,7 @@ impl Signature { Signature { type_signature: TypeSignature::OneOf(type_signatures), volatility, + strict: false, } } /// Specialized Signature for ArrayAppend and similar functions @@ -578,6 +593,7 @@ impl Signature { ArrayFunctionSignature::ArrayAndElement, ), volatility, + strict: false, } } /// Specialized Signature for Array functions with an optional index @@ -587,6 +603,7 @@ impl Signature { ArrayFunctionSignature::ArrayAndElementAndOptionalIndex, ), volatility, + strict: false, } } /// Specialized Signature for ArrayPrepend and similar functions @@ -596,6 +613,7 @@ impl Signature { ArrayFunctionSignature::ElementAndArray, ), volatility, + strict: false, } } /// Specialized Signature for ArrayElement and similar functions @@ -605,6 +623,7 @@ impl Signature { ArrayFunctionSignature::ArrayAndIndex, ), volatility, + strict: false, } } /// Specialized Signature for ArrayEmpty and similar functions @@ -612,8 +631,15 @@ impl Signature { Signature { type_signature: TypeSignature::ArraySignature(ArrayFunctionSignature::Array), volatility, + strict: false, } } + + /// Returns an equivalent Signature, with strict set to true. + pub fn with_strict(mut self) -> Self { + self.strict = true; + self + } } #[cfg(test)] diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index 64703a982eab..7fd17f4fcd05 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -27,7 +27,7 @@ use arrow::array::MutableArrayData; use arrow::array::OffsetSizeTrait; use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType; -use arrow_schema::DataType::{FixedSizeList, LargeList, List, Null}; +use arrow_schema::DataType::{FixedSizeList, LargeList, List}; use arrow_schema::Field; use datafusion_common::cast::as_int64_array; use datafusion_common::cast::as_large_list_array; @@ -330,7 +330,8 @@ pub(super) struct ArraySlice { impl ArraySlice { pub fn new() -> Self { Self { - signature: Signature::variadic_any(Volatility::Immutable), + // TODO: This signature should use the actual accepted types, not variadic_any. + signature: Signature::variadic_any(Volatility::Immutable).with_strict(), aliases: vec![String::from("list_slice")], } } @@ -419,27 +420,17 @@ fn array_slice_inner(args: &[ArrayRef]) -> Result { None }; - let null_input = args.iter().any(|arg| arg.data_type().is_null()); + let from_array = as_int64_array(&args[1])?; + let to_array = as_int64_array(&args[2])?; + let array_data_type = args[0].data_type(); match array_data_type { - Null => Ok(Arc::clone(&args[0])), - List(field_ref) if null_input => Ok(Arc::new(GenericListArray::::new_null( - Arc::clone(field_ref), - 1, - ))), - LargeList(field_ref) if null_input => Ok(Arc::new( - GenericListArray::::new_null(Arc::clone(field_ref), 1), - )), List(_) => { let array = as_list_array(&args[0])?; - let from_array = as_int64_array(&args[1])?; - let to_array = as_int64_array(&args[2])?; general_array_slice::(array, from_array, to_array, stride) } LargeList(_) => { let array = as_large_list_array(&args[0])?; - let from_array = as_int64_array(&args[1])?; - let to_array = as_int64_array(&args[2])?; general_array_slice::(array, from_array, to_array, stride) } _ => exec_err!("array_slice does not support type: {:?}", array_data_type), diff --git a/datafusion/functions-nested/src/flatten.rs b/datafusion/functions-nested/src/flatten.rs index 30bf2fcbf624..1d97e095207c 100644 --- a/datafusion/functions-nested/src/flatten.rs +++ b/datafusion/functions-nested/src/flatten.rs @@ -80,6 +80,7 @@ impl Flatten { ArrayFunctionSignature::RecursiveArray, ), volatility: Volatility::Immutable, + strict: false, }, aliases: vec![], } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 936adbc098d6..1c55b30a4acc 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -39,7 +39,7 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use arrow_array::Array; +use arrow_array::{Array, LargeListArray, ListArray}; use datafusion_common::{internal_err, DFSchema, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; @@ -186,6 +186,56 @@ impl PhysicalExpr for ScalarFunctionExpr { .map(|e| e.evaluate(batch)) .collect::>>()?; + if self.fun.signature().strict && args.iter().any(|arg| arg.data_type().is_null()) + { + let null_value = match &self.return_type { + DataType::Null => ScalarValue::Null, + DataType::Boolean => ScalarValue::Boolean(None), + DataType::Int8 => ScalarValue::Int8(None), + DataType::Int16 => ScalarValue::Int16(None), + DataType::Int32 => ScalarValue::Int32(None), + DataType::Int64 => ScalarValue::Int64(None), + DataType::UInt8 => ScalarValue::UInt8(None), + DataType::UInt16 => ScalarValue::UInt16(None), + DataType::UInt32 => ScalarValue::UInt32(None), + DataType::UInt64 => ScalarValue::UInt64(None), + DataType::Float16 => ScalarValue::Float16(None), + DataType::Float32 => ScalarValue::Float32(None), + DataType::Float64 => ScalarValue::Float64(None), + DataType::Timestamp(_, _) => todo!(), + DataType::Date32 => todo!(), + DataType::Date64 => todo!(), + DataType::Time32(_) => todo!(), + DataType::Time64(_) => todo!(), + DataType::Duration(_) => todo!(), + DataType::Interval(_) => todo!(), + DataType::Binary => todo!(), + DataType::FixedSizeBinary(_) => todo!(), + DataType::LargeBinary => todo!(), + DataType::BinaryView => todo!(), + DataType::Utf8 => todo!(), + DataType::LargeUtf8 => todo!(), + DataType::Utf8View => todo!(), + DataType::List(field_ref) => ScalarValue::List(Arc::new( + ListArray::new_null(Arc::clone(field_ref), 1), + )), + DataType::ListView(_) => todo!(), + DataType::FixedSizeList(_, _) => todo!(), + DataType::LargeList(field_ref) => ScalarValue::LargeList(Arc::new( + LargeListArray::new_null(Arc::clone(field_ref), 1), + )), + DataType::LargeListView(_) => todo!(), + DataType::Struct(_) => todo!(), + DataType::Union(_, _) => todo!(), + DataType::Dictionary(_, _) => todo!(), + DataType::Decimal128(_, _) => todo!(), + DataType::Decimal256(_, _) => todo!(), + DataType::Map(_, _) => todo!(), + DataType::RunEndEncoded(_, _) => todo!(), + }; + return Ok(ColumnarValue::Scalar(null_value)); + } + let input_empty = args.is_empty(); let input_all_scalar = args .iter() diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index b37ef7ee1b5f..ce2d2bf8d6f8 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -1850,12 +1850,15 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), 0, [] [] # array_slice scalar function #11 (with NULL-NULL) -query error +query ?? select array_slice(make_array(1, 2, 3, 4, 5), NULL), array_slice(make_array('h', 'e', 'l', 'l', 'o'), NULL); +---- +NULL NULL -query error +query ?? select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), NULL), array_slice(arrow_cast(make_array('h', 'e', 'l', 'l', 'o'), 'LargeList(Utf8)'), NULL); - +---- +NULL NULL # array_slice scalar function #12 (with zero and negative number) query ?? From eb450cc038dc624c5205c8b93af108329da192ca Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 25 Jan 2025 20:41:06 -0500 Subject: [PATCH 05/13] Generate all null values --- .../physical-expr/src/scalar_function.rs | 48 +------------------ 1 file changed, 2 insertions(+), 46 deletions(-) diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 1c55b30a4acc..8fb978ad6c9a 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -39,7 +39,7 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use arrow_array::{Array, LargeListArray, ListArray}; +use arrow_array::Array; use datafusion_common::{internal_err, DFSchema, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; @@ -188,51 +188,7 @@ impl PhysicalExpr for ScalarFunctionExpr { if self.fun.signature().strict && args.iter().any(|arg| arg.data_type().is_null()) { - let null_value = match &self.return_type { - DataType::Null => ScalarValue::Null, - DataType::Boolean => ScalarValue::Boolean(None), - DataType::Int8 => ScalarValue::Int8(None), - DataType::Int16 => ScalarValue::Int16(None), - DataType::Int32 => ScalarValue::Int32(None), - DataType::Int64 => ScalarValue::Int64(None), - DataType::UInt8 => ScalarValue::UInt8(None), - DataType::UInt16 => ScalarValue::UInt16(None), - DataType::UInt32 => ScalarValue::UInt32(None), - DataType::UInt64 => ScalarValue::UInt64(None), - DataType::Float16 => ScalarValue::Float16(None), - DataType::Float32 => ScalarValue::Float32(None), - DataType::Float64 => ScalarValue::Float64(None), - DataType::Timestamp(_, _) => todo!(), - DataType::Date32 => todo!(), - DataType::Date64 => todo!(), - DataType::Time32(_) => todo!(), - DataType::Time64(_) => todo!(), - DataType::Duration(_) => todo!(), - DataType::Interval(_) => todo!(), - DataType::Binary => todo!(), - DataType::FixedSizeBinary(_) => todo!(), - DataType::LargeBinary => todo!(), - DataType::BinaryView => todo!(), - DataType::Utf8 => todo!(), - DataType::LargeUtf8 => todo!(), - DataType::Utf8View => todo!(), - DataType::List(field_ref) => ScalarValue::List(Arc::new( - ListArray::new_null(Arc::clone(field_ref), 1), - )), - DataType::ListView(_) => todo!(), - DataType::FixedSizeList(_, _) => todo!(), - DataType::LargeList(field_ref) => ScalarValue::LargeList(Arc::new( - LargeListArray::new_null(Arc::clone(field_ref), 1), - )), - DataType::LargeListView(_) => todo!(), - DataType::Struct(_) => todo!(), - DataType::Union(_, _) => todo!(), - DataType::Dictionary(_, _) => todo!(), - DataType::Decimal128(_, _) => todo!(), - DataType::Decimal256(_, _) => todo!(), - DataType::Map(_, _) => todo!(), - DataType::RunEndEncoded(_, _) => todo!(), - }; + let null_value = ScalarValue::try_from(&self.return_type)?; return Ok(ColumnarValue::Scalar(null_value)); } From d7503cd972886cfecf9d498bf48540c1f66a3c17 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 29 Jan 2025 04:00:45 -0500 Subject: [PATCH 06/13] Add NullHandling enum --- datafusion/expr-common/src/signature.rs | 55 +++++++++++-------- datafusion/expr/src/lib.rs | 4 +- datafusion/functions-nested/src/extract.rs | 5 +- datafusion/functions-nested/src/flatten.rs | 7 +-- .../physical-expr/src/scalar_function.rs | 4 +- 5 files changed, 42 insertions(+), 33 deletions(-) diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index 79a649300333..d79b0a10e8d0 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -455,6 +455,15 @@ fn get_data_types(native_type: &NativeType) -> Vec { } } +/// A function's behavior when the input is Null. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] +pub enum NullHandling { + /// Null inputs are passed into the function implementation. + PassThrough, + /// Any Null input causes the function to return Null. + Propagate, +} + /// Defines the supported argument types ([`TypeSignature`]) and [`Volatility`] for a function. /// /// DataFusion will automatically coerce (cast) argument types to one of the supported @@ -465,8 +474,8 @@ pub struct Signature { pub type_signature: TypeSignature, /// The volatility of the function. See [Volatility] for more information. pub volatility: Volatility, - /// When true, the function always returns null whenever any of its arguments are null. - pub strict: bool, + /// The Null handling of the function. See [NullHandling] for more information. + pub null_handling: NullHandling, } impl Signature { @@ -475,7 +484,7 @@ impl Signature { Signature { type_signature, volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// An arbitrary number of arguments with the same type, from those listed in `common_types`. @@ -483,7 +492,7 @@ impl Signature { Self { type_signature: TypeSignature::Variadic(common_types), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// User-defined coercion rules for the function. @@ -491,7 +500,7 @@ impl Signature { Self { type_signature: TypeSignature::UserDefined, volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } @@ -500,7 +509,7 @@ impl Signature { Self { type_signature: TypeSignature::Numeric(arg_count), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } @@ -509,7 +518,7 @@ impl Signature { Self { type_signature: TypeSignature::String(arg_count), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } @@ -518,7 +527,7 @@ impl Signature { Self { type_signature: TypeSignature::VariadicAny, volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// A fixed number of arguments of the same type, from those listed in `valid_types`. @@ -530,7 +539,7 @@ impl Signature { Self { type_signature: TypeSignature::Uniform(arg_count, valid_types), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// Exactly matches the types in `exact_types`, in order. @@ -538,7 +547,7 @@ impl Signature { Signature { type_signature: TypeSignature::Exact(exact_types), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// Target coerce types in order @@ -549,7 +558,7 @@ impl Signature { Self { type_signature: TypeSignature::Coercible(target_types), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } @@ -558,7 +567,7 @@ impl Signature { Self { type_signature: TypeSignature::Comparable(arg_count), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } @@ -566,7 +575,7 @@ impl Signature { Signature { type_signature: TypeSignature::Nullary, volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } @@ -575,7 +584,7 @@ impl Signature { Signature { type_signature: TypeSignature::Any(arg_count), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// Any one of a list of [TypeSignature]s. @@ -583,7 +592,7 @@ impl Signature { Signature { type_signature: TypeSignature::OneOf(type_signatures), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// Specialized Signature for ArrayAppend and similar functions @@ -593,7 +602,7 @@ impl Signature { ArrayFunctionSignature::ArrayAndElement, ), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// Specialized Signature for Array functions with an optional index @@ -603,7 +612,7 @@ impl Signature { ArrayFunctionSignature::ArrayAndElementAndOptionalIndex, ), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// Specialized Signature for ArrayPrepend and similar functions @@ -613,7 +622,7 @@ impl Signature { ArrayFunctionSignature::ElementAndArray, ), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// Specialized Signature for ArrayElement and similar functions @@ -623,7 +632,7 @@ impl Signature { ArrayFunctionSignature::ArrayAndIndex, ), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } /// Specialized Signature for ArrayEmpty and similar functions @@ -631,13 +640,13 @@ impl Signature { Signature { type_signature: TypeSignature::ArraySignature(ArrayFunctionSignature::Array), volatility, - strict: false, + null_handling: NullHandling::PassThrough, } } - /// Returns an equivalent Signature, with strict set to true. - pub fn with_strict(mut self) -> Self { - self.strict = true; + /// Returns an equivalent Signature, with null_handling set to the input. + pub fn with_null_handling(mut self, null_handling: NullHandling) -> Self { + self.null_handling = null_handling; self } } diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 017415da8f23..238428d58c8d 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -71,8 +71,8 @@ pub use datafusion_expr_common::columnar_value::ColumnarValue; pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; pub use datafusion_expr_common::operator::Operator; pub use datafusion_expr_common::signature::{ - ArrayFunctionSignature, Signature, TypeSignature, TypeSignatureClass, Volatility, - TIMEZONE_WILDCARD, + ArrayFunctionSignature, NullHandling, Signature, TypeSignature, TypeSignatureClass, + Volatility, TIMEZONE_WILDCARD, }; pub use datafusion_expr_common::type_coercion::binary; pub use expr::{ diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index 7fd17f4fcd05..9d8c60ce2613 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -37,7 +37,7 @@ use datafusion_common::{ }; use datafusion_expr::Expr; use datafusion_expr::{ - ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, + ColumnarValue, Documentation, NullHandling, ScalarUDFImpl, Signature, Volatility, }; use datafusion_macros::user_doc; use std::any::Any; @@ -331,7 +331,8 @@ impl ArraySlice { pub fn new() -> Self { Self { // TODO: This signature should use the actual accepted types, not variadic_any. - signature: Signature::variadic_any(Volatility::Immutable).with_strict(), + signature: Signature::variadic_any(Volatility::Immutable) + .with_null_handling(NullHandling::Propagate), aliases: vec![String::from("list_slice")], } } diff --git a/datafusion/functions-nested/src/flatten.rs b/datafusion/functions-nested/src/flatten.rs index 1d97e095207c..60657ba8f533 100644 --- a/datafusion/functions-nested/src/flatten.rs +++ b/datafusion/functions-nested/src/flatten.rs @@ -26,10 +26,7 @@ use datafusion_common::cast::{ as_generic_list_array, as_large_list_array, as_list_array, }; use datafusion_common::{exec_err, Result}; -use datafusion_expr::{ - ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature, - TypeSignature, Volatility, -}; +use datafusion_expr::{ArrayFunctionSignature, ColumnarValue, Documentation, NullHandling, ScalarUDFImpl, Signature, TypeSignature, Volatility}; use datafusion_macros::user_doc; use std::any::Any; use std::sync::Arc; @@ -80,7 +77,7 @@ impl Flatten { ArrayFunctionSignature::RecursiveArray, ), volatility: Volatility::Immutable, - strict: false, + null_handling: NullHandling::PassThrough, }, aliases: vec![], } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index 8fb978ad6c9a..f64c1f7642dd 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -47,6 +47,7 @@ use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf; use datafusion_expr::{ expr_vec_fmt, ColumnarValue, Expr, ReturnTypeArgs, ScalarFunctionArgs, ScalarUDF, }; +use datafusion_expr_common::signature::NullHandling; /// Physical expression of a scalar function #[derive(Eq, PartialEq, Hash)] @@ -186,7 +187,8 @@ impl PhysicalExpr for ScalarFunctionExpr { .map(|e| e.evaluate(batch)) .collect::>>()?; - if self.fun.signature().strict && args.iter().any(|arg| arg.data_type().is_null()) + if self.fun.signature().null_handling == NullHandling::Propagate + && args.iter().any(|arg| arg.data_type().is_null()) { let null_value = ScalarValue::try_from(&self.return_type)?; return Ok(ColumnarValue::Scalar(null_value)); From 07a0769e497475d3a2acb873fe11b3012ce38c8e Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 29 Jan 2025 04:49:25 -0500 Subject: [PATCH 07/13] Update array_slice signature --- datafusion/expr-common/src/signature.rs | 21 ++++++++++----- .../expr/src/type_coercion/functions.rs | 13 ++++++--- datafusion/functions-nested/src/extract.rs | 27 ++++++++++++++++--- datafusion/functions-nested/src/flatten.rs | 5 +++- .../physical-expr/src/scalar_function.rs | 4 ++- datafusion/sqllogictest/test_files/array.slt | 3 +++ 6 files changed, 58 insertions(+), 15 deletions(-) diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index d79b0a10e8d0..38f070ad492d 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -19,6 +19,7 @@ //! and return types of functions in DataFusion. use std::fmt::Display; +use std::num::NonZeroUsize; use crate::type_coercion::aggregates::NUMERICS; use arrow::datatypes::{DataType, IntervalUnit, TimeUnit}; @@ -236,9 +237,9 @@ pub enum ArrayFunctionSignature { /// The first argument should be non-list or list, and the second argument should be List/LargeList. /// The first argument's list dimension should be one dimension less than the second argument's list dimension. ElementAndArray, - /// Specialized Signature for Array functions of the form (List/LargeList, Index) - /// The first argument should be List/LargeList/FixedSizedList, and the second argument should be Int64. - ArrayAndIndex, + /// Specialized Signature for Array functions of the form (List/LargeList, Index+) + /// The first argument should be List/LargeList/FixedSizedList, and the next n arguments should be Int64. + ArrayAndIndexes(NonZeroUsize), /// Specialized Signature for Array functions of the form (List/LargeList, Element, Optional Index) ArrayAndElementAndOptionalIndex, /// Specialized Signature for ArrayEmpty and similar functions @@ -265,8 +266,12 @@ impl Display for ArrayFunctionSignature { ArrayFunctionSignature::ElementAndArray => { write!(f, "element, array") } - ArrayFunctionSignature::ArrayAndIndex => { - write!(f, "array, index") + ArrayFunctionSignature::ArrayAndIndexes(count) => { + write!(f, "array")?; + for _ in 0..count.get() { + write!(f, ", index")?; + } + Ok(()) } ArrayFunctionSignature::Array => { write!(f, "array") @@ -627,9 +632,13 @@ impl Signature { } /// Specialized Signature for ArrayElement and similar functions pub fn array_and_index(volatility: Volatility) -> Self { + Self::array_and_indexes(volatility, NonZeroUsize::new(1).expect("1 is non-zero")) + } + /// Specialized Signature for ArraySlice and similar functions + pub fn array_and_indexes(volatility: Volatility, count: NonZeroUsize) -> Self { Signature { type_signature: TypeSignature::ArraySignature( - ArrayFunctionSignature::ArrayAndIndex, + ArrayFunctionSignature::ArrayAndIndexes(count), ), volatility, null_handling: NullHandling::PassThrough, diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 650619e6de4c..8cebf7c3db12 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -671,13 +671,20 @@ fn get_valid_types( ArrayFunctionSignature::ElementAndArray => { array_append_or_prepend_valid_types(current_types, false)? } - ArrayFunctionSignature::ArrayAndIndex => { - if current_types.len() != 2 { + ArrayFunctionSignature::ArrayAndIndexes(count) => { + if current_types.len() != count.get() + 1 { return Ok(vec![vec![]]); } array(¤t_types[0]).map_or_else( || vec![vec![]], - |array_type| vec![vec![array_type, DataType::Int64]], + |array_type| { + let mut inner = Vec::with_capacity(count.get() + 1); + inner.push(array_type); + for _ in 0..count.get() { + inner.push(DataType::Int64); + } + vec![inner] + }, ) } ArrayFunctionSignature::ArrayAndElementAndOptionalIndex => { diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index 9d8c60ce2613..c32180d965fb 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -35,12 +35,13 @@ use datafusion_common::cast::as_list_array; use datafusion_common::{ exec_err, internal_datafusion_err, plan_err, DataFusionError, Result, }; -use datafusion_expr::Expr; +use datafusion_expr::{ArrayFunctionSignature, Expr, TypeSignature}; use datafusion_expr::{ ColumnarValue, Documentation, NullHandling, ScalarUDFImpl, Signature, Volatility, }; use datafusion_macros::user_doc; use std::any::Any; +use std::num::NonZeroUsize; use std::sync::Arc; use crate::utils::make_scalar_function; @@ -330,9 +331,27 @@ pub(super) struct ArraySlice { impl ArraySlice { pub fn new() -> Self { Self { - // TODO: This signature should use the actual accepted types, not variadic_any. - signature: Signature::variadic_any(Volatility::Immutable) - .with_null_handling(NullHandling::Propagate), + signature: Signature::one_of( + vec![ + TypeSignature::ArraySignature( + ArrayFunctionSignature::ArrayAndIndexes( + NonZeroUsize::new(1).expect("1 is non-zero"), + ), + ), + TypeSignature::ArraySignature( + ArrayFunctionSignature::ArrayAndIndexes( + NonZeroUsize::new(2).expect("2 is non-zero"), + ), + ), + TypeSignature::ArraySignature( + ArrayFunctionSignature::ArrayAndIndexes( + NonZeroUsize::new(3).expect("3 is non-zero"), + ), + ), + ], + Volatility::Immutable, + ) + .with_null_handling(NullHandling::Propagate), aliases: vec![String::from("list_slice")], } } diff --git a/datafusion/functions-nested/src/flatten.rs b/datafusion/functions-nested/src/flatten.rs index 60657ba8f533..6f19d6b2192d 100644 --- a/datafusion/functions-nested/src/flatten.rs +++ b/datafusion/functions-nested/src/flatten.rs @@ -26,7 +26,10 @@ use datafusion_common::cast::{ as_generic_list_array, as_large_list_array, as_list_array, }; use datafusion_common::{exec_err, Result}; -use datafusion_expr::{ArrayFunctionSignature, ColumnarValue, Documentation, NullHandling, ScalarUDFImpl, Signature, TypeSignature, Volatility}; +use datafusion_expr::{ + ArrayFunctionSignature, ColumnarValue, Documentation, NullHandling, ScalarUDFImpl, + Signature, TypeSignature, Volatility, +}; use datafusion_macros::user_doc; use std::any::Any; use std::sync::Arc; diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index f64c1f7642dd..ce53febc42df 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -188,7 +188,9 @@ impl PhysicalExpr for ScalarFunctionExpr { .collect::>>()?; if self.fun.signature().null_handling == NullHandling::Propagate - && args.iter().any(|arg| arg.data_type().is_null()) + && args.iter().any( + |arg| matches!(arg, ColumnarValue::Scalar(scalar) if scalar.is_null()), + ) { let null_value = ScalarValue::try_from(&self.return_type)?; return Ok(ColumnarValue::Scalar(null_value)); diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index ce2d2bf8d6f8..1d4408fb37f4 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -2055,6 +2055,9 @@ select array_slice(a, -1, 2, 1), array_slice(a, -1, 2), query error DataFusion error: Error during planning: array_slice does not support zero arguments select array_slice(); +query error Failed to coerce arguments +select array_slice(3.5, NULL, NULL); + ## array_any_value (aliases: list_any_value) # Testing with empty arguments should result in an error From 9080fba15cdfdfa05fe6f67e2a289cc757a01e11 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 1 Feb 2025 10:00:51 -0500 Subject: [PATCH 08/13] Switch to trait method --- datafusion/expr-common/src/signature.rs | 35 ------------------- datafusion/expr/src/lib.rs | 4 +-- datafusion/expr/src/udaf.rs | 2 +- datafusion/expr/src/udf.rs | 20 +++++++++++ datafusion/functions-nested/src/extract.rs | 7 ++-- datafusion/functions-nested/src/flatten.rs | 3 +- .../physical-expr/src/scalar_function.rs | 7 ++-- 7 files changed, 31 insertions(+), 47 deletions(-) diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index 38f070ad492d..1bfae28af840 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -460,15 +460,6 @@ fn get_data_types(native_type: &NativeType) -> Vec { } } -/// A function's behavior when the input is Null. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] -pub enum NullHandling { - /// Null inputs are passed into the function implementation. - PassThrough, - /// Any Null input causes the function to return Null. - Propagate, -} - /// Defines the supported argument types ([`TypeSignature`]) and [`Volatility`] for a function. /// /// DataFusion will automatically coerce (cast) argument types to one of the supported @@ -479,8 +470,6 @@ pub struct Signature { pub type_signature: TypeSignature, /// The volatility of the function. See [Volatility] for more information. pub volatility: Volatility, - /// The Null handling of the function. See [NullHandling] for more information. - pub null_handling: NullHandling, } impl Signature { @@ -489,7 +478,6 @@ impl Signature { Signature { type_signature, volatility, - null_handling: NullHandling::PassThrough, } } /// An arbitrary number of arguments with the same type, from those listed in `common_types`. @@ -497,7 +485,6 @@ impl Signature { Self { type_signature: TypeSignature::Variadic(common_types), volatility, - null_handling: NullHandling::PassThrough, } } /// User-defined coercion rules for the function. @@ -505,7 +492,6 @@ impl Signature { Self { type_signature: TypeSignature::UserDefined, volatility, - null_handling: NullHandling::PassThrough, } } @@ -514,7 +500,6 @@ impl Signature { Self { type_signature: TypeSignature::Numeric(arg_count), volatility, - null_handling: NullHandling::PassThrough, } } @@ -523,7 +508,6 @@ impl Signature { Self { type_signature: TypeSignature::String(arg_count), volatility, - null_handling: NullHandling::PassThrough, } } @@ -532,7 +516,6 @@ impl Signature { Self { type_signature: TypeSignature::VariadicAny, volatility, - null_handling: NullHandling::PassThrough, } } /// A fixed number of arguments of the same type, from those listed in `valid_types`. @@ -544,7 +527,6 @@ impl Signature { Self { type_signature: TypeSignature::Uniform(arg_count, valid_types), volatility, - null_handling: NullHandling::PassThrough, } } /// Exactly matches the types in `exact_types`, in order. @@ -552,7 +534,6 @@ impl Signature { Signature { type_signature: TypeSignature::Exact(exact_types), volatility, - null_handling: NullHandling::PassThrough, } } /// Target coerce types in order @@ -563,7 +544,6 @@ impl Signature { Self { type_signature: TypeSignature::Coercible(target_types), volatility, - null_handling: NullHandling::PassThrough, } } @@ -572,7 +552,6 @@ impl Signature { Self { type_signature: TypeSignature::Comparable(arg_count), volatility, - null_handling: NullHandling::PassThrough, } } @@ -580,7 +559,6 @@ impl Signature { Signature { type_signature: TypeSignature::Nullary, volatility, - null_handling: NullHandling::PassThrough, } } @@ -589,7 +567,6 @@ impl Signature { Signature { type_signature: TypeSignature::Any(arg_count), volatility, - null_handling: NullHandling::PassThrough, } } /// Any one of a list of [TypeSignature]s. @@ -597,7 +574,6 @@ impl Signature { Signature { type_signature: TypeSignature::OneOf(type_signatures), volatility, - null_handling: NullHandling::PassThrough, } } /// Specialized Signature for ArrayAppend and similar functions @@ -607,7 +583,6 @@ impl Signature { ArrayFunctionSignature::ArrayAndElement, ), volatility, - null_handling: NullHandling::PassThrough, } } /// Specialized Signature for Array functions with an optional index @@ -617,7 +592,6 @@ impl Signature { ArrayFunctionSignature::ArrayAndElementAndOptionalIndex, ), volatility, - null_handling: NullHandling::PassThrough, } } /// Specialized Signature for ArrayPrepend and similar functions @@ -627,7 +601,6 @@ impl Signature { ArrayFunctionSignature::ElementAndArray, ), volatility, - null_handling: NullHandling::PassThrough, } } /// Specialized Signature for ArrayElement and similar functions @@ -641,7 +614,6 @@ impl Signature { ArrayFunctionSignature::ArrayAndIndexes(count), ), volatility, - null_handling: NullHandling::PassThrough, } } /// Specialized Signature for ArrayEmpty and similar functions @@ -649,15 +621,8 @@ impl Signature { Signature { type_signature: TypeSignature::ArraySignature(ArrayFunctionSignature::Array), volatility, - null_handling: NullHandling::PassThrough, } } - - /// Returns an equivalent Signature, with null_handling set to the input. - pub fn with_null_handling(mut self, null_handling: NullHandling) -> Self { - self.null_handling = null_handling; - self - } } #[cfg(test)] diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 238428d58c8d..0d3b5cbbaef8 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -71,7 +71,7 @@ pub use datafusion_expr_common::columnar_value::ColumnarValue; pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; pub use datafusion_expr_common::operator::Operator; pub use datafusion_expr_common::signature::{ - ArrayFunctionSignature, NullHandling, Signature, TypeSignature, TypeSignatureClass, + ArrayFunctionSignature, Signature, TypeSignature, TypeSignatureClass, Volatility, TIMEZONE_WILDCARD, }; pub use datafusion_expr_common::type_coercion::binary; @@ -95,7 +95,7 @@ pub use udaf::{ }; pub use udf::{ scalar_doc_sections, ReturnInfo, ReturnTypeArgs, ScalarFunctionArgs, ScalarUDF, - ScalarUDFImpl, + ScalarUDFImpl, NullHandling }; pub use udwf::{window_doc_sections, ReversedUDWF, WindowUDF, WindowUDFImpl}; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 56c9822495f8..44c5f6cfbe82 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -389,7 +389,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { /// Whether the aggregate function is nullable. /// - /// Nullable means that that the function could return `null` for any inputs. + /// Nullable means that the function could return `null` for any inputs. /// For example, aggregate functions like `COUNT` always return a non null value /// but others like `MIN` will return `NULL` if there is nullable input. /// Note that if the function is declared as *not* nullable, make sure the [`AggregateUDFImpl::default_value`] is `non-null` diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index aa6a5cddad95..035fbf211da6 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -200,6 +200,12 @@ impl ScalarUDF { self.inner.return_type_from_args(args) } + /// Retruns the behavior that this function has when any of the inputs are Null. + pub fn null_handling(&self) -> NullHandling { + self.inner.null_handling() + } + + /// Do the function rewrite /// /// See [`ScalarUDFImpl::simplify`] for more details. @@ -417,6 +423,15 @@ impl ReturnInfo { } } +/// A function's behavior when the input is Null. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] +pub enum NullHandling { + /// Null inputs are passed into the function implementation. + PassThrough, + /// Any Null input causes the function to return Null. + Propagate, +} + /// Trait for implementing user defined scalar functions. /// /// This trait exposes the full API for implementing user defined functions and @@ -589,6 +604,11 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { true } + /// Retruns the behavior that this function has when any of the inputs are Null. + fn null_handling(&self) -> NullHandling { + NullHandling::PassThrough + } + /// Invoke the function on `args`, returning the appropriate result /// /// Note: This method is deprecated and will be removed in future releases. diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index c32180d965fb..cc5e502be788 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -350,8 +350,7 @@ impl ArraySlice { ), ], Volatility::Immutable, - ) - .with_null_handling(NullHandling::Propagate), + ), aliases: vec![String::from("list_slice")], } } @@ -395,6 +394,10 @@ impl ScalarUDFImpl for ArraySlice { Ok(arg_types[0].clone()) } + fn null_handling(&self) -> NullHandling { + NullHandling::Propagate + } + fn invoke_batch( &self, args: &[ColumnarValue], diff --git a/datafusion/functions-nested/src/flatten.rs b/datafusion/functions-nested/src/flatten.rs index 6f19d6b2192d..a665fb6a76aa 100644 --- a/datafusion/functions-nested/src/flatten.rs +++ b/datafusion/functions-nested/src/flatten.rs @@ -27,7 +27,7 @@ use datafusion_common::cast::{ }; use datafusion_common::{exec_err, Result}; use datafusion_expr::{ - ArrayFunctionSignature, ColumnarValue, Documentation, NullHandling, ScalarUDFImpl, + ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use datafusion_macros::user_doc; @@ -80,7 +80,6 @@ impl Flatten { ArrayFunctionSignature::RecursiveArray, ), volatility: Volatility::Immutable, - null_handling: NullHandling::PassThrough, }, aliases: vec![], } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index ce53febc42df..d60149c68da5 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -44,10 +44,7 @@ use datafusion_common::{internal_err, DFSchema, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf; -use datafusion_expr::{ - expr_vec_fmt, ColumnarValue, Expr, ReturnTypeArgs, ScalarFunctionArgs, ScalarUDF, -}; -use datafusion_expr_common::signature::NullHandling; +use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, NullHandling, ReturnTypeArgs, ScalarFunctionArgs, ScalarUDF}; /// Physical expression of a scalar function #[derive(Eq, PartialEq, Hash)] @@ -187,7 +184,7 @@ impl PhysicalExpr for ScalarFunctionExpr { .map(|e| e.evaluate(batch)) .collect::>>()?; - if self.fun.signature().null_handling == NullHandling::Propagate + if self.fun.null_handling() == NullHandling::Propagate && args.iter().any( |arg| matches!(arg, ColumnarValue::Scalar(scalar) if scalar.is_null()), ) From 97176587ea69dfed48dc38ed59331cf66ce0653e Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 1 Feb 2025 10:03:59 -0500 Subject: [PATCH 09/13] fmt --- datafusion/expr/src/lib.rs | 8 ++++---- datafusion/expr/src/udf.rs | 1 - datafusion/functions-nested/src/flatten.rs | 4 ++-- datafusion/physical-expr/src/scalar_function.rs | 5 ++++- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 0d3b5cbbaef8..b9a309faed59 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -71,8 +71,8 @@ pub use datafusion_expr_common::columnar_value::ColumnarValue; pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; pub use datafusion_expr_common::operator::Operator; pub use datafusion_expr_common::signature::{ - ArrayFunctionSignature, Signature, TypeSignature, TypeSignatureClass, - Volatility, TIMEZONE_WILDCARD, + ArrayFunctionSignature, Signature, TypeSignature, TypeSignatureClass, Volatility, + TIMEZONE_WILDCARD, }; pub use datafusion_expr_common::type_coercion::binary; pub use expr::{ @@ -94,8 +94,8 @@ pub use udaf::{ aggregate_doc_sections, AggregateUDF, AggregateUDFImpl, ReversedUDAF, StatisticsArgs, }; pub use udf::{ - scalar_doc_sections, ReturnInfo, ReturnTypeArgs, ScalarFunctionArgs, ScalarUDF, - ScalarUDFImpl, NullHandling + scalar_doc_sections, NullHandling, ReturnInfo, ReturnTypeArgs, ScalarFunctionArgs, + ScalarUDF, ScalarUDFImpl, }; pub use udwf::{window_doc_sections, ReversedUDWF, WindowUDF, WindowUDFImpl}; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 035fbf211da6..274bf4714416 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -205,7 +205,6 @@ impl ScalarUDF { self.inner.null_handling() } - /// Do the function rewrite /// /// See [`ScalarUDFImpl::simplify`] for more details. diff --git a/datafusion/functions-nested/src/flatten.rs b/datafusion/functions-nested/src/flatten.rs index a665fb6a76aa..30bf2fcbf624 100644 --- a/datafusion/functions-nested/src/flatten.rs +++ b/datafusion/functions-nested/src/flatten.rs @@ -27,8 +27,8 @@ use datafusion_common::cast::{ }; use datafusion_common::{exec_err, Result}; use datafusion_expr::{ - ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, - Signature, TypeSignature, Volatility, + ArrayFunctionSignature, ColumnarValue, Documentation, ScalarUDFImpl, Signature, + TypeSignature, Volatility, }; use datafusion_macros::user_doc; use std::any::Any; diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index d60149c68da5..1cd4b673ce7f 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -44,7 +44,10 @@ use datafusion_common::{internal_err, DFSchema, Result, ScalarValue}; use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf; -use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, NullHandling, ReturnTypeArgs, ScalarFunctionArgs, ScalarUDF}; +use datafusion_expr::{ + expr_vec_fmt, ColumnarValue, Expr, NullHandling, ReturnTypeArgs, ScalarFunctionArgs, + ScalarUDF, +}; /// Physical expression of a scalar function #[derive(Eq, PartialEq, Hash)] From 3d21297cad1ced905e88f2a207e7d215e6ef4a35 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 1 Feb 2025 10:10:19 -0500 Subject: [PATCH 10/13] Fix typo --- datafusion/expr/src/udf.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 274bf4714416..7c91b6b3b4ab 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -200,7 +200,7 @@ impl ScalarUDF { self.inner.return_type_from_args(args) } - /// Retruns the behavior that this function has when any of the inputs are Null. + /// Returns the behavior that this function has when any of the inputs are Null. pub fn null_handling(&self) -> NullHandling { self.inner.null_handling() } @@ -603,7 +603,7 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { true } - /// Retruns the behavior that this function has when any of the inputs are Null. + /// Returns the behavior that this function has when any of the inputs are Null. fn null_handling(&self) -> NullHandling { NullHandling::PassThrough } From 8321f13c138663a166319926ae3037bd7c59dae3 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 1 Feb 2025 12:57:48 -0500 Subject: [PATCH 11/13] Handle batch inputs --- datafusion/functions-nested/src/extract.rs | 34 ++++++------ datafusion/sqllogictest/test_files/array.slt | 56 ++++++++++---------- 2 files changed, 47 insertions(+), 43 deletions(-) diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index cc5e502be788..abeeb2e6aaa0 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -27,6 +27,7 @@ use arrow::array::MutableArrayData; use arrow::array::OffsetSizeTrait; use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType; +use arrow_buffer::NullBufferBuilder; use arrow_schema::DataType::{FixedSizeList, LargeList, List}; use arrow_schema::Field; use datafusion_common::cast::as_int64_array; @@ -475,7 +476,7 @@ where // use_nulls: false, we don't need nulls but empty array for array_slice, so we don't need explicit nulls but adjust offset to indicate nulls. let mut mutable = - MutableArrayData::with_capacities(vec![&original_data], false, capacity); + MutableArrayData::with_capacities(vec![&original_data], true, capacity); // We have the slice syntax compatible with DuckDB v0.8.1. // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb. @@ -538,30 +539,33 @@ where } let mut offsets = vec![O::usize_as(0)]; + let mut null_builder = NullBufferBuilder::new(array.len()); for (row_index, offset_window) in array.offsets().windows(2).enumerate() { let start = offset_window[0]; let end = offset_window[1]; let len = end - start; - // len 0 indicate array is null, return empty array in this row. + // If any input is null, return null. + if array.is_null(row_index) + || from_array.is_null(row_index) + || to_array.is_null(row_index) + { + mutable.extend_nulls(1); + offsets.push(offsets[row_index] + O::usize_as(1)); + null_builder.append_null(); + continue; + } + null_builder.append_non_null(); + + // Empty arrays always return an empty array. if len == O::usize_as(0) { offsets.push(offsets[row_index]); continue; } - // If index is null, we consider it as the minimum / maximum index of the array. - let from_index = if from_array.is_null(row_index) { - Some(O::usize_as(0)) - } else { - adjusted_from_index::(from_array.value(row_index), len)? - }; - - let to_index = if to_array.is_null(row_index) { - Some(len - O::usize_as(1)) - } else { - adjusted_to_index::(to_array.value(row_index), len)? - }; + let from_index = adjusted_from_index::(from_array.value(row_index), len)?; + let to_index = adjusted_to_index::(to_array.value(row_index), len)?; if let (Some(from), Some(to)) = (from_index, to_index) { let stride = stride.map(|s| s.value(row_index)); @@ -635,7 +639,7 @@ where Arc::new(Field::new_list_field(array.value_type(), true)), OffsetBuffer::::new(offsets.into()), arrow_array::make_array(data), - None, + null_builder.finish(), )?)) } diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 1d4408fb37f4..f4b409b2cae6 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -950,9 +950,9 @@ select column1[2:4], column2[1:4], column3[3:4] from arrays; [[5, 6]] [NULL, 5.5, 6.6] [NULL, u] [[7, 8]] [7.7, 8.8, 9.9] [l, o] [[9, 10]] [10.1, NULL, 12.2] [t] -[] [13.3, 14.4, 15.5] [e, t] -[[13, 14]] [] [] -[[NULL, 18]] [16.6, 17.7, 18.8] [] +NULL [13.3, 14.4, 15.5] [e, t] +[[13, 14]] NULL [] +[[NULL, 18]] [16.6, 17.7, 18.8] NULL # multiple index with columns #2 (zero index) query ??? @@ -962,9 +962,9 @@ select column1[0:5], column2[0:3], column3[0:9] from arrays; [[3, 4], [5, 6]] [NULL, 5.5, 6.6] [i, p, NULL, u, m] [[5, 6], [7, 8]] [7.7, 8.8, 9.9] [d, NULL, l, o, r] [[7, NULL], [9, 10]] [10.1, NULL, 12.2] [s, i, t] -[] [13.3, 14.4, 15.5] [a, m, e, t] -[[11, 12], [13, 14]] [] [,] -[[15, 16], [NULL, 18]] [16.6, 17.7, 18.8] [] +NULL [13.3, 14.4, 15.5] [a, m, e, t] +[[11, 12], [13, 14]] NULL [,] +[[15, 16], [NULL, 18]] [16.6, 17.7, 18.8] NULL # TODO: support negative index # multiple index with columns #3 (negative index) @@ -1026,9 +1026,9 @@ select column1[2:4:2], column2[1:4:2], column3[3:4:2] from arrays; [[5, 6]] [NULL, 6.6] [NULL] [[7, 8]] [7.7, 9.9] [l] [[9, 10]] [10.1, 12.2] [t] -[] [13.3, 15.5] [e] -[[13, 14]] [] [] -[[NULL, 18]] [16.6, 18.8] [] +NULL [13.3, 15.5] [e] +[[13, 14]] NULL [] +[[NULL, 18]] [16.6, 18.8] NULL # multiple index with columns #2 (zero index) query ??? @@ -1038,9 +1038,9 @@ select column1[0:5:2], column2[0:3:2], column3[0:9:2] from arrays; [[3, 4]] [NULL, 6.6] [i, NULL, m] [[5, 6]] [7.7, 9.9] [d, l, r] [[7, NULL]] [10.1, 12.2] [s, t] -[] [13.3, 15.5] [a, e] -[[11, 12]] [] [,] -[[15, 16]] [16.6, 18.8] [] +NULL [13.3, 15.5] [a, e] +[[11, 12]] NULL [,] +[[15, 16]] [16.6, 18.8] NULL ### Array function tests @@ -1579,7 +1579,7 @@ select array_pop_back(column1) from arrayspop; [3, 4, 5] [6, 7, 8, NULL] [NULL, NULL] -[] +NULL [NULL, 10, 11] query ? @@ -1589,7 +1589,7 @@ select array_pop_back(arrow_cast(column1, 'LargeList(Int64)')) from arrayspop; [3, 4, 5] [6, 7, 8, NULL] [NULL, NULL] -[] +NULL [NULL, 10, 11] query ? @@ -1599,7 +1599,7 @@ select array_pop_back(column1) from large_arrayspop; [3, 4, 5] [6, 7, 8, NULL] [NULL, NULL] -[] +NULL [NULL, 10, 11] query ? @@ -1609,7 +1609,7 @@ select array_pop_back(arrow_cast(column1, 'LargeList(Int64)')) from large_arrays [3, 4, 5] [6, 7, 8, NULL] [NULL, NULL] -[] +NULL [NULL, 10, 11] ## array_pop_front (aliases: `list_pop_front`) @@ -2001,9 +2001,9 @@ select array_slice(column1, column2, column3) from slices; [12, 13, 14, 15, 16, 17] [] [] -[] -[41, 42, 43, 44, 45, 46] -[55, 56, 57, 58, 59, 60] +NULL +NULL +NULL query ? select array_slice(arrow_cast(column1, 'LargeList(Int64)'), column2, column3) from slices; @@ -2012,9 +2012,9 @@ select array_slice(arrow_cast(column1, 'LargeList(Int64)'), column2, column3) fr [12, 13, 14, 15, 16, 17] [] [] -[] -[41, 42, 43, 44, 45, 46] -[55, 56, 57, 58, 59, 60] +NULL +NULL +NULL # TODO: support NULLS in output instead of `[]` # array_slice with columns and scalars @@ -2025,9 +2025,9 @@ select array_slice(make_array(1, 2, 3, 4, 5), column2, column3), array_slice(col [2] [13, 14, 15, 16, 17] [12, 13, 14, 15] [] [] [21, 22, 23, NULL, 25] [] [33, 34] [] -[4, 5] [] [] -[1, 2, 3, 4, 5] [43, 44, 45, 46] [41, 42, 43, 44, 45] -[5] [NULL, 54, 55, 56, 57, 58, 59, 60] [55] +[4, 5] NULL NULL +NULL [43, 44, 45, 46] NULL +NULL NULL [55] query ??? select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), column2, column3), array_slice(arrow_cast(column1, 'LargeList(Int64)'), 3, column3), array_slice(arrow_cast(column1, 'LargeList(Int64)'), column2, 5) from slices; @@ -2036,9 +2036,9 @@ select array_slice(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'), co [2] [13, 14, 15, 16, 17] [12, 13, 14, 15] [] [] [21, 22, 23, NULL, 25] [] [33, 34] [] -[4, 5] [] [] -[1, 2, 3, 4, 5] [43, 44, 45, 46] [41, 42, 43, 44, 45] -[5] [NULL, 54, 55, 56, 57, 58, 59, 60] [55] +[4, 5] NULL NULL +NULL [43, 44, 45, 46] NULL +NULL NULL [55] # Test issue: https://github.com/apache/datafusion/issues/10425 # `from` may be larger than `to` and `stride` is positive From bb14245d6a435576ea8e941fbbc5ed52d016b40c Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 1 Feb 2025 13:18:59 -0500 Subject: [PATCH 12/13] Fix comment --- datafusion/functions-nested/src/extract.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index abeeb2e6aaa0..669dd200d4ed 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -474,7 +474,6 @@ where let original_data = values.to_data(); let capacity = Capacities::Array(original_data.len()); - // use_nulls: false, we don't need nulls but empty array for array_slice, so we don't need explicit nulls but adjust offset to indicate nulls. let mut mutable = MutableArrayData::with_capacities(vec![&original_data], true, capacity); From cbc84d5a9d3af79d13c293522b365b8a25b7a1bd Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Sat, 1 Feb 2025 14:19:19 -0500 Subject: [PATCH 13/13] Update array_pop methods null handling --- datafusion/functions-nested/src/extract.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index 669dd200d4ed..c87a96dca7a4 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -690,6 +690,10 @@ impl ScalarUDFImpl for ArrayPopFront { Ok(arg_types[0].clone()) } + fn null_handling(&self) -> NullHandling { + NullHandling::Propagate + } + fn invoke_batch( &self, args: &[ColumnarValue], @@ -790,6 +794,10 @@ impl ScalarUDFImpl for ArrayPopBack { Ok(arg_types[0].clone()) } + fn null_handling(&self) -> NullHandling { + NullHandling::Propagate + } + fn invoke_batch( &self, args: &[ColumnarValue],