Skip to content

Commit

Permalink
apacheGH-45269: [C++][Compute] Add pivot function
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Jan 16, 2025
1 parent df82d4c commit cce44f9
Show file tree
Hide file tree
Showing 9 changed files with 1,066 additions and 10 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ if(ARROW_COMPUTE)
compute/kernels/aggregate_tdigest.cc
compute/kernels/aggregate_var_std.cc
compute/kernels/hash_aggregate.cc
compute/kernels/pivot_internal.cc
compute/kernels/scalar_arithmetic.cc
compute/kernels/scalar_boolean.cc
compute/kernels/scalar_compare.cc
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/acero/groupby_aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,13 @@ Result<AggregateNodeArgs<HashAggregateKernel>> GroupByNode::MakeAggregateNodeArg

// Find input field indices for aggregates
std::vector<std::vector<int>> agg_src_fieldsets(aggs.size());
// ARROW_LOG(INFO) << "input schema: " << input_schema->ToString();
for (size_t i = 0; i < aggs.size(); ++i) {
const auto& target_fieldset = aggs[i].target;
// ARROW_LOG(INFO) << "target #" << i << " has " << target_fieldset.size() << "
// targets";
for (const auto& target : target_fieldset) {
// ARROW_LOG(INFO) << " ... " << target.ToString();
ARROW_ASSIGN_OR_RAISE(auto match, target.FindOne(*input_schema));
agg_src_fieldsets[i].push_back(match[0]);
}
Expand All @@ -108,6 +112,8 @@ Result<AggregateNodeArgs<HashAggregateKernel>> GroupByNode::MakeAggregateNodeArg
std::vector<std::vector<TypeHolder>> agg_src_types(aggs.size());
for (size_t i = 0; i < aggs.size(); ++i) {
for (const auto& agg_src_field_id : agg_src_fieldsets[i]) {
// ARROW_LOG(INFO) << "target #" << i << " field = " <<
// input_schema->field(agg_src_field_id)->ToString();
agg_src_types[i].push_back(input_schema->field(agg_src_field_id)->type().get());
}
}
Expand Down Expand Up @@ -282,6 +288,11 @@ Status GroupByNode::Merge() {
DCHECK(state0->agg_states[span_i]);
batch_ctx.SetState(state0->agg_states[span_i].get());

// XXX this resizes each KernelState (state0->agg_states[span_i]) multiple times.
// An alternative would be a two-pass algorithm:
// 1. Compute all transpositions (one per local state) and the final number of
// groups.
// 2. Process all agg kernels, resizing each KernelState only once.
RETURN_NOT_OK(
agg_kernels_[span_i]->resize(&batch_ctx, state0->grouper->num_groups()));
RETURN_NOT_OK(agg_kernels_[span_i]->merge(
Expand Down
Loading

0 comments on commit cce44f9

Please sign in to comment.