Skip to content

Commit

Permalink
[FLINK-37321][table] Make window functions with wrong descriptors fai…
Browse files Browse the repository at this point in the history
…l during planning rather than runtime
  • Loading branch information
snuyanzin authored Feb 19, 2025
1 parent 9a1e986 commit 18c6caf
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,6 @@ private SliceAssigner createSliceAssigner(
} else if (windowSpec instanceof CumulativeWindowSpec) {
Duration maxSize = ((CumulativeWindowSpec) windowSpec).getMaxSize();
Duration step = ((CumulativeWindowSpec) windowSpec).getStep();
if (maxSize.toMillis() % step.toMillis() != 0) {
throw new TableException(
String.format(
"CUMULATE table function based aggregate requires maxSize must be an "
+ "integral multiple of step, but got maxSize %s ms and step %s ms",
maxSize.toMillis(), step.toMillis()));
}
SliceAssigners.CumulativeSliceAssigner assigner =
SliceAssigners.cumulative(timeAttributeIndex, shiftTimeZone, maxSize, step);
Duration offset = ((CumulativeWindowSpec) windowSpec).getOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ object WindowUtil {
null
}
val interval = getOperandAsLong(windowCall.operands(2))
if (interval <= 0) {
throw new ValidationException(
s"TUMBLE table function based aggregate requires size to be positive," +
s" but got $interval ms.")
}
if (offset != null && Math.abs(offset.toMillis) >= interval) {
throw new ValidationException(
s"TUMBLE table function parameters must satisfy abs(offset) < size, " +
s"but got size $interval ms and offset ${offset.toMillis} ms.")
}
new TumblingWindowSpec(Duration.ofMillis(interval), offset)

case FlinkSqlOperatorTable.HOP =>
Expand All @@ -227,6 +237,11 @@ object WindowUtil {
}
val slide = getOperandAsLong(windowCall.operands(2))
val size = getOperandAsLong(windowCall.operands(3))
if (slide <= 0 || size <= 0) {
throw new ValidationException(
s"HOP table function based aggregate requires slide and size to be positive," +
s" but got slide $slide ms and size $size ms.")
}
new HoppingWindowSpec(Duration.ofMillis(size), Duration.ofMillis(slide), offset)

case FlinkSqlOperatorTable.CUMULATE =>
Expand All @@ -237,13 +252,27 @@ object WindowUtil {
}
val step = getOperandAsLong(windowCall.operands(2))
val maxSize = getOperandAsLong(windowCall.operands(3))
if (step <= 0 || maxSize <= 0) {
throw new ValidationException(
s"CUMULATE table function based aggregate requires maxSize and step to be positive," +
s" but got maxSize $maxSize ms and step $step ms.")
}
if (maxSize % step != 0) {
throw new ValidationException("CUMULATE table function based aggregate requires maxSize " +
s"must be an integral multiple of step, but got maxSize $maxSize ms and step $step ms.")
}
new CumulativeWindowSpec(Duration.ofMillis(maxSize), Duration.ofMillis(step), offset)
case FlinkSqlOperatorTable.SESSION =>
val tableArgCall = windowCall.operands(0).asInstanceOf[RexTableArgCall]
if (!tableArgCall.getOrderKeys.isEmpty) {
throw new ValidationException("Session window TVF doesn't support order by clause.")
}
val gap = getOperandAsLong(windowCall.operands(2))
if (gap <= 0) {
throw new ValidationException(
s"SESSION table function based aggregate requires gap to be positive," +
s" but got gap $gap ms.")
}
new SessionWindowSpec(Duration.ofMillis(gap), tableArgCall.getPartitionKeys)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ SELECT
b,
count(distinct c) AS uv
FROM TABLE(
CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '25' MINUTE, INTERVAL '1' HOUR))
CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '20' MINUTE, INTERVAL '1' HOUR))
GROUP BY GROUPING SETS ((a), (b)), window_start, window_end
]]>
</Resource>
Expand All @@ -677,7 +677,7 @@ GROUP BY GROUPING SETS ((a), (b)), window_start, window_end
LogicalProject(a=[$0], b=[$1], uv=[$4])
+- LogicalAggregate(group=[{0, 1, 2, 3}], groups=[[{0, 2, 3}, {1, 2, 3}]], uv=[COUNT(DISTINCT $4)])
+- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8], c=[$2])
+- LogicalTableFunctionScan(invocation=[CUMULATE(TABLE(#0), DESCRIPTOR($5), 1500000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalTableFunctionScan(invocation=[CUMULATE(TABLE(#0), DESCRIPTOR($5), 1200000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
Expand All @@ -687,7 +687,7 @@ LogicalProject(a=[$0], b=[$1], uv=[$4])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, uv])
+- WindowAggregate(groupBy=[a, b, $e], window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[25 min])], select=[a, b, $e, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+- WindowAggregate(groupBy=[a, b, $e], window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[20 min])], select=[a, b, $e, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a, b, $e]])
+- Expand(projects=[{a, null AS b, c, 4 AS $e, rowtime}, {null AS a, b, c, 8 AS $e, rowtime}])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
Expand All @@ -703,7 +703,7 @@ SELECT
b,
count(distinct c) AS uv
FROM TABLE(
CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '25' MINUTE, INTERVAL '1' HOUR))
CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '20' MINUTE, INTERVAL '1' HOUR))
GROUP BY GROUPING SETS ((a), (b)), window_start, window_end
]]>
</Resource>
Expand All @@ -712,7 +712,7 @@ GROUP BY GROUPING SETS ((a), (b)), window_start, window_end
LogicalProject(a=[$0], b=[$1], uv=[$4])
+- LogicalAggregate(group=[{0, 1, 2, 3}], groups=[[{0, 2, 3}, {1, 2, 3}]], uv=[COUNT(DISTINCT $4)])
+- LogicalProject(a=[$0], b=[$1], window_start=[$7], window_end=[$8], c=[$2])
+- LogicalTableFunctionScan(invocation=[CUMULATE(TABLE(#0), DESCRIPTOR($5), 1500000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalTableFunctionScan(invocation=[CUMULATE(TABLE(#0), DESCRIPTOR($5), 1200000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
Expand All @@ -722,9 +722,9 @@ LogicalProject(a=[$0], b=[$1], uv=[$4])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, uv])
+- GlobalWindowAggregate(groupBy=[a, b, $e], window=[CUMULATE(slice_end=[$slice_end], max_size=[1 h], step=[25 min])], select=[a, b, $e, COUNT(distinct$0 count$0) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+- GlobalWindowAggregate(groupBy=[a, b, $e], window=[CUMULATE(slice_end=[$slice_end], max_size=[1 h], step=[20 min])], select=[a, b, $e, COUNT(distinct$0 count$0) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a, b, $e]])
+- LocalWindowAggregate(groupBy=[a, b, $e], window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[25 min])], select=[a, b, $e, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0, slice_end('w$) AS $slice_end])
+- LocalWindowAggregate(groupBy=[a, b, $e], window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[20 min])], select=[a, b, $e, COUNT(distinct$0 c) AS count$0, DISTINCT(c) AS distinct$0, slice_end('w$) AS $slice_end])
+- Expand(projects=[{a, null AS b, c, 4 AS $e, rowtime}, {null AS a, b, c, 8 AS $e, rowtime}])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
*/
package org.apache.flink.table.planner.plan.stream.sql

import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.utils.TableTestBase

import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}

import java.time.Duration

Expand Down Expand Up @@ -347,6 +349,103 @@ class WindowTableFunctionTest extends TableTestBase {
util.verifyRelPlan(sql)
}

@ParameterizedTest(name = "{index}: {0}")
@ValueSource(ints = Array[Int](-1, 0))
def testTumbleWindowWithNonPositiveInterval(size: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$size' MINUTE))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(
new ValidationException(
s"TUMBLE table function based aggregate requires size to be positive," +
s" but got ${size * 1000 * 60} ms."))
}

@ParameterizedTest(name = "{index}: {0}, {1}")
@CsvSource(Array[String]("1, 2", "2, -2"))
def testTumbleWindowWithWrongOffset(size: Int, offset: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$size' MINUTE, INTERVAL '$offset' MINUTE))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(
new ValidationException(
s"TUMBLE table function parameters must satisfy abs(offset) < size, " +
s"but got size ${size * 60 * 1000} ms and offset ${offset * 60 * 1000} ms."))
}

@ParameterizedTest(name = "{index}: {0}, {1}")
@CsvSource(Array[String]("-1, 1", "0, 2", "3, 0", "4, -3"))
def testCumulateWindowWithNonPositiveStepAndSize(step: Int, size: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(
| CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$step' MINUTE, INTERVAL '$size' HOUR))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(
new ValidationException(
"CUMULATE table function based aggregate requires maxSize and step to be positive," +
s" but got maxSize ${size * 1000 * 60 * 60} ms and step ${step * 1000 * 60} ms."))
}

@ParameterizedTest(name = "{index}: {0}, {1}")
@CsvSource(Array[String]("2, 3", "4, 7"))
def testCumulateWindowWithWrongStepAndSize(step: Int, size: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(
| CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$step' SECOND, INTERVAL '$size' SECOND))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(new ValidationException("CUMULATE table function based aggregate requires maxSize must " +
s"be an integral multiple of step, but got maxSize ${size * 1000} ms and step ${step * 1000} ms."))
}

@ParameterizedTest(name = "{index}: {0}, {1}")
@CsvSource(Array[String]("-1, 1", "0, 2", "3, 0", "4, -3"))
def testHopWindowWithNonPositiveSlideAndSize(slide: Int, size: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(
| HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$slide' MINUTE, INTERVAL '$size' MINUTE))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(
new ValidationException(
"HOP table function based aggregate requires slide and size to be positive," +
s" but got slide ${slide * 1000 * 60} ms and size ${size * 1000 * 60} ms."))
}

@ParameterizedTest(name = "{index}: {0}")
@ValueSource(ints = Array[Int](-1, 0))
def testSessionWindowWithNonPositiveGap(gap: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$gap' MINUTE))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(
new ValidationException(
s"SESSION table function based aggregate requires gap to be positive," +
s" but got gap ${gap * 1000 * 60} ms."))
}

private def enableMiniBatch(): Unit = {
util.tableConfig.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
| b,
| count(distinct c) AS uv
|FROM TABLE(
| CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '25' MINUTE, INTERVAL '1' HOUR))
| CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '20' MINUTE, INTERVAL '1' HOUR))
|GROUP BY GROUPING SETS ((a), (b)), window_start, window_end
""".stripMargin
util.verifyRelPlan(sql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private TumblingSliceAssigner(
checkArgument(
Math.abs(offset) < size,
String.format(
"Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
"Tumbling Window parameters must satisfy abs(offset) < size, but got size %dms and offset %dms.",
size, offset));
this.size = size;
this.offset = offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void testInvalidParameters(final ZoneId zoneId) {
() ->
SliceAssigners.tumbling(0, zoneId, Duration.ofSeconds(10))
.withOffset(Duration.ofSeconds(20)),
"Tumbling Window parameters must satisfy abs(offset) < size, bot got size 10000ms and offset 20000ms.");
"Tumbling Window parameters must satisfy abs(offset) < size, but got size 10000ms and offset 20000ms.");

// should pass
SliceAssigners.tumbling(0, zoneId, Duration.ofSeconds(10))
Expand Down

0 comments on commit 18c6caf

Please sign in to comment.