Skip to content

Commit

Permalink
[Feature]support max_by/min_by in window op (#54961)
Browse files Browse the repository at this point in the history
Signed-off-by: before-Sunrise <[email protected]>
(cherry picked from commit da2ef02)
  • Loading branch information
before-Sunrise authored and mergify[bot] committed Jan 26, 2025
1 parent 84f14bb commit ca2a9de
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 0 deletions.
72 changes: 72 additions & 0 deletions be/src/exprs/agg/maxmin_by.h
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,42 @@ class MaxMinByAggregateFunction final
}
}

void get_values(FunctionContext* ctx, ConstAggDataPtr __restrict state, Column* dst, size_t start,
size_t end) const override {
if constexpr (State::not_filter_nulls_flag) {
if (this->data(state).null_result) {
DCHECK(dst->is_nullable());
for (size_t i = start; i < end; ++i) {
dst->append_default();
}
} else {
if (dst->is_nullable()) {
for (size_t i = start; i < end; ++i) {
down_cast<NullableColumn*>(dst)->null_column()->append(DATUM_NOT_NULL);
}
}
for (size_t i = start; i < end; ++i) {
ColumnHelper::get_data_column(dst)->deserialize_and_append(this->data(state).buffer_result.data());
}
}
} else {
if (this->data(state).buffer_result.empty()) {
for (size_t i = start; i < end; ++i) {
dst->append_default();
}
} else {
if (dst->is_nullable()) {
for (size_t i = start; i < end; ++i) {
down_cast<NullableColumn*>(dst)->null_column()->append(DATUM_NOT_NULL);
}
}
for (size_t i = start; i < end; ++i) {
ColumnHelper::get_data_column(dst)->deserialize_and_append(this->data(state).buffer_result.data());
}
}
}
}

std::string get_name() const override { return "maxmin_by"; }
};

Expand Down Expand Up @@ -793,6 +829,42 @@ class MaxMinByAggregateFunction<LT, State, OP, RunTimeCppType<LT>, StringLTGuard
}
}

void get_values(FunctionContext* ctx, ConstAggDataPtr __restrict state, Column* dst, size_t start,
size_t end) const override {
if constexpr (State::not_filter_nulls_flag) {
if (this->data(state).null_result) {
DCHECK(dst->is_nullable());
for (size_t i = start; i < end; ++i) {
dst->append_default();
}
} else {
if (dst->is_nullable()) {
for (size_t i = start; i < end; ++i) {
down_cast<NullableColumn*>(dst)->null_column()->append(DATUM_NOT_NULL);
}
}
for (size_t i = start; i < end; ++i) {
ColumnHelper::get_data_column(dst)->deserialize_and_append(this->data(state).buffer_result.data());
}
}
} else {
if (this->data(state).buffer_result.empty()) {
for (size_t i = start; i < end; ++i) {
dst->append_default();
}
} else {
if (dst->is_nullable()) {
for (size_t i = start; i < end; ++i) {
down_cast<NullableColumn*>(dst)->null_column()->append(DATUM_NOT_NULL);
}
}
for (size_t i = start; i < end; ++i) {
ColumnHelper::get_data_column(dst)->deserialize_and_append(this->data(state).buffer_result.data());
}
}
}
}

std::string get_name() const override { return "maxmin_by"; }
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,4 +352,30 @@ select (sum(murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(a,0))+murmur_h
select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(a,0))+murmur_hash3_32(ifnull(b,0)))) as fingerprint from (select c0,max_by(c2,concat(coalesce(c2,'NULL'),c3)) over(partition by c1) a,min_by(c2,concat(coalesce(c2,'NULL'),c3)) over(partition by c1) b from t0) as t;
-- result:
25406211869
-- !result
-- name: test_max_min_by_support_window
CREATE TABLE exam (
subject_id INT,
subject STRING,
exam_result INT
) DISTRIBUTED BY HASH(`subject_id`) PROPERTIES ("replication_num" = "1");
-- result:
-- !result
insert into exam values
(1,'math',90),
(2,'english',70),
(3,'physics',95),
(4,'chemistry',85),
(5,'music',95),
(6,'biology',null);
-- result:
-- !result
SELECT max_by(subject, exam_result) over(partition by subject_id) FROM exam;
-- result:
english
None
physics
chemistry
music
math
-- !result
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,21 @@ select (sum(murmur_hash3_32(ifnull(__c_0,0))+murmur_hash3_32(ifnull(a,0))+murmu
select (sum(murmur_hash3_32(ifnull(__c_0,0))+murmur_hash3_32(ifnull(a,0))+murmur_hash3_32(ifnull(b,0)))) as fingerprint from (select (count(DISTINCT c1)) as __c_0 ,max_by(c2,concat(coalesce(c2,'NULL'),c3)) a,min_by(c2,concat(coalesce(c2,'NULL'),c3)) b from t0) as t;
select (sum(murmur_hash3_32(ifnull(c2,0))+murmur_hash3_32(ifnull(a,0))+murmur_hash3_32(ifnull(b,0)))) as fingerprint from (select c2,max_by(c0,coalesce(c0,0)*1000+c1) over(partition by c2) a,min_by(c0,coalesce(c0,0)*1000+c1) over(partition by c2) b from t0) as t;
select (sum(murmur_hash3_32(ifnull(c0,0))+murmur_hash3_32(ifnull(a,0))+murmur_hash3_32(ifnull(b,0)))) as fingerprint from (select c0,max_by(c2,concat(coalesce(c2,'NULL'),c3)) over(partition by c1) a,min_by(c2,concat(coalesce(c2,'NULL'),c3)) over(partition by c1) b from t0) as t;

-- name: test_max_min_by_support_window
CREATE TABLE exam (
subject_id INT,
subject STRING,
exam_result INT
) DISTRIBUTED BY HASH(`subject_id`) PROPERTIES ("replication_num" = "1");


insert into exam values
(1,'math',90),
(2,'english',70),
(3,'physics',95),
(4,'chemistry',85),
(5,'music',95),
(6,'biology',null);

SELECT max_by(subject, exam_result) over(partition by subject_id) FROM exam;

0 comments on commit ca2a9de

Please sign in to comment.