Skip to content

Commit

Permalink
support limit in agg exec for ser/deser
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed May 28, 2024
1 parent 3dc1773 commit e9f23d2
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 2 deletions.
6 changes: 6 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,11 @@ message MaybePhysicalSortExprs {
repeated PhysicalSortExprNode sort_expr = 1;
}

message AggLimit {
// wrap into a message to make it optional
uint64 limit = 1;
}

message AggregateExecNode {
repeated PhysicalExprNode group_expr = 1;
repeated PhysicalExprNode aggr_expr = 2;
Expand All @@ -1122,6 +1127,7 @@ message AggregateExecNode {
repeated PhysicalExprNode null_expr = 8;
repeated bool groups = 9;
repeated MaybeFilter filter_expr = 10;
AggLimit limit = 11;
}

message GlobalLimitExecNode {
Expand Down
111 changes: 111 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,23 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
})
.collect::<Result<Vec<_>, _>>()?;

Ok(Arc::new(AggregateExec::try_new(
let limit = hash_agg
.limit
.as_ref()
.map(|lit_value| lit_value.limit as usize);

let agg = AggregateExec::try_new(
agg_mode,
PhysicalGroupBy::new(group_expr, null_expr, groups),
physical_aggr_expr,
physical_filter_expr,
input,
physical_schema,
)?))
)?;

let agg = agg.with_limit(limit);

Ok(Arc::new(agg))
}
PhysicalPlanType::HashJoin(hashjoin) => {
let left: Arc<dyn ExecutionPlan> = into_physical_plan(
Expand Down Expand Up @@ -1504,6 +1513,10 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
.map(|expr| serialize_physical_expr(expr.0.to_owned(), extension_codec))
.collect::<Result<Vec<_>>>()?;

let limit = exec.limit().map(|value| protobuf::AggLimit {
limit: value as u64,
});

return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
protobuf::AggregateExecNode {
Expand All @@ -1517,6 +1530,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
input_schema: Some(input_schema.as_ref().try_into()?),
null_expr,
groups,
limit,
},
))),
});
Expand Down
27 changes: 27 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,33 @@ fn rountrip_aggregate() -> Result<()> {
Ok(())
}

#[test]
fn rountrip_aggregate_with_limit() -> Result<()> {
let field_a = Field::new("a", DataType::Int64, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));

let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("a", &schema)?, "unused".to_string())];

let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
cast(col("b", &schema)?, &schema, DataType::Float64)?,
"AVG(b)".to_string(),
DataType::Float64,
))];

let agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::new_single(groups.clone()),
aggregates.clone(),
vec![None],
Arc::new(EmptyExec::new(schema.clone())),
schema,
)?;
let agg = agg.with_limit(Some(12));
roundtrip_test(Arc::new(agg))
}

#[test]
fn roundtrip_aggregate_udaf() -> Result<()> {
let field_a = Field::new("a", DataType::Int64, false);
Expand Down

0 comments on commit e9f23d2

Please sign in to comment.