Skip to content

Commit

Permalink
Update test to test the new changes
Browse files Browse the repository at this point in the history
  • Loading branch information
EmilyFlarionIO committed Feb 26, 2025
1 parent 69c86f2 commit 8c934f7
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.optimizer.EliminateSorts
import org.apache.spark.sql.comet.CometHashAggregateExec
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.{count_distinct, sum}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -830,11 +831,21 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
checkSparkAnswerAndNumOfAggregates("SELECT _2 + 2, SUM(_1) FROM tbl GROUP BY _2", 2)
checkSparkAnswerAndNumOfAggregates("SELECT _2, SUM(_1 + 1) FROM tbl GROUP BY _2", 2)

// result expression is unsupported by Comet, so only partial aggregation should be used
val df = sql(
"SELECT _2, MIN(_1) + java_method('java.lang.Math', 'random') " +
"FROM tbl GROUP BY _2")
assert(getNumCometHashAggregate(df) == 2)

val sparkPlan = stripAQEPlan(df.queryExecution.executedPlan)
// Comet should run both stages as native
assert(sparkPlan.collect { case s: CometHashAggregateExec => s }.size == 2)
// However,due to the result expression not being supported,
// it will be converted in to a ProjectExec with the final aggregate as its child
assert(sparkPlan.collect {
case ProjectExec(_, ha: CometHashAggregateExec)
if ha.mode.map(_.toString).contains("Final") =>
true
}.size == 1)
}
}
}
Expand Down

0 comments on commit 8c934f7

Please sign in to comment.