Skip to content

Commit

Permalink
[BugFix] MaxBy/MinBy not filter nulls (backport #51354)
Browse files Browse the repository at this point in the history
Signed-off-by: satanson <[email protected]>
(cherry picked from commit 9398edd)
Signed-off-by: satanson <[email protected]>
  • Loading branch information
satanson committed Sep 29, 2024
1 parent 0ff8659 commit 1154fe4
Show file tree
Hide file tree
Showing 13 changed files with 1,485 additions and 211 deletions.
3 changes: 2 additions & 1 deletion be/src/exec/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ Status Aggregator::prepare(RuntimeState* state, ObjectPool* pool, RuntimeProfile

// Because max_by and min_by function have two input types,
// so we use its second arguments type as input.
if (fn.name.function_name == "max_by" || fn.name.function_name == "min_by") {
if (fn.name.function_name == "max_by" || fn.name.function_name == "min_by" ||
fn.name.function_name == "max_by_v2" || fn.name.function_name == "min_by_v2") {
arg_type = TypeDescriptor::from_thrift(fn.arg_types[1]);
}

Expand Down
8 changes: 7 additions & 1 deletion be/src/exec/analytor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,13 @@ Status Analytor::prepare(RuntimeState* state, ObjectPool* pool, RuntimeProfile*
real_fn_name += "_in";
_need_partition_materializing = true;
}
func = get_window_function(real_fn_name, arg_type.type, return_type.type, is_input_nullable, fn.binary_type,
const auto& fname = fn.name.function_name;
auto real_arg_type = arg_type.type;
if (fname == "max_by" || fname == "min_by" || fname == "max_by_v2" || fname == "min_by_v2") {
const TypeDescriptor arg1_type = TypeDescriptor::from_thrift(fn.arg_types[1]);
real_arg_type = arg1_type.type;
}
func = get_window_function(real_fn_name, real_arg_type, return_type.type, is_input_nullable, fn.binary_type,
state->func_version());
if (func == nullptr) {
return Status::InternalError(strings::Substitute(
Expand Down
16 changes: 8 additions & 8 deletions be/src/exprs/agg/factory/aggregate_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ class AggregateFactory {
template <LogicalType LT>
static auto MakeMaxAggregateFunction();

template <LogicalType LT>
template <LogicalType LT, bool not_filter_nulls>
static auto MakeMaxByAggregateFunction();

template <LogicalType LT>
template <LogicalType LT, bool not_filter_nulls>
static auto MakeMinByAggregateFunction();

template <LogicalType LT>
Expand Down Expand Up @@ -290,16 +290,16 @@ auto AggregateFactory::MakeMaxAggregateFunction() {
return std::make_shared<MaxMinAggregateFunction<LT, MaxAggregateData<LT>, MaxElement<LT, MaxAggregateData<LT>>>>();
}

template <LogicalType LT>
template <LogicalType LT, bool not_filter_nulls>
auto AggregateFactory::MakeMaxByAggregateFunction() {
return std::make_shared<
MaxMinByAggregateFunction<LT, MaxByAggregateData<LT>, MaxByElement<LT, MaxByAggregateData<LT>>>>();
using AggData = MaxByAggregateData<LT, not_filter_nulls>;
return std::make_shared<MaxMinByAggregateFunction<LT, AggData, MaxByElement<LT, AggData>>>();
}

template <LogicalType LT>
template <LogicalType LT, bool not_filter_nulls>
auto AggregateFactory::MakeMinByAggregateFunction() {
return std::make_shared<
MaxMinByAggregateFunction<LT, MinByAggregateData<LT>, MinByElement<LT, MinByAggregateData<LT>>>>();
using AggData = MinByAggregateData<LT, not_filter_nulls>;
return std::make_shared<MaxMinByAggregateFunction<LT, AggData, MinByElement<LT, AggData>>>();
}

template <LogicalType LT>
Expand Down
12 changes: 8 additions & 4 deletions be/src/exprs/agg/factory/aggregate_resolver_minmaxany.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,15 @@ struct MaxMinByDispatcherInner {
if constexpr ((lt_is_aggregate<arg_type> || lt_is_json<arg_type>)&&(lt_is_aggregate<ret_type> ||
lt_is_json<ret_type>)) {
if constexpr (is_max_by) {
resolver->add_aggregate_mapping_variadic<arg_type, ret_type, MaxByAggregateData<arg_type>>(
"max_by", true, AggregateFactory::MakeMaxByAggregateFunction<arg_type>());
resolver->add_aggregate_mapping_notnull<arg_type, ret_type>(
"max_by", true, AggregateFactory::MakeMaxByAggregateFunction<arg_type, false>());
resolver->add_aggregate_mapping_notnull<arg_type, ret_type>(
"max_by_v2", true, AggregateFactory::MakeMaxByAggregateFunction<arg_type, true>());
} else {
resolver->add_aggregate_mapping_variadic<arg_type, ret_type, MinByAggregateData<arg_type>>(
"min_by", true, AggregateFactory::MakeMinByAggregateFunction<arg_type>());
resolver->add_aggregate_mapping_notnull<arg_type, ret_type>(
"min_by", true, AggregateFactory::MakeMinByAggregateFunction<arg_type, false>());
resolver->add_aggregate_mapping_notnull<arg_type, ret_type>(
"min_by_v2", true, AggregateFactory::MakeMinByAggregateFunction<arg_type, true>());
}
}
}
Expand Down
Loading

0 comments on commit 1154fe4

Please sign in to comment.