From 83ae36a854ffe37b5ea16a9cb0494b8651123f02 Mon Sep 17 00:00:00 2001
From: Wink <809097465@qq.com>
Date: Wed, 19 Feb 2025 09:58:37 +0800
Subject: [PATCH] [FLINK-37201] [transform] Transform arithmetic function
support nullable parameters and more numerical types
This closes #3881.
---
.../content.zh/docs/core-concept/transform.md | 2 +-
docs/content/docs/core-concept/transform.md | 24 +-
.../flink/FlinkPipelineTransformITCase.java | 287 +++++++++++++++++-
.../functions/SystemFunctionUtils.java | 236 ++++++++------
.../data/writer/AbstractBinaryWriter.java | 2 -
.../transform/PostTransformOperatorTest.java | 3 +-
.../runtime/parser/TransformParserTest.java | 3 +
7 files changed, 445 insertions(+), 112 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md
index 190bcbd31fd..ef87bd35d97 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -127,7 +127,7 @@ Flink CDC 使用 [Calcite](https://calcite.apache.org/) 来解析表达式并且
| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by NUMERIC2. |
| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus) of numeric1 divided by numeric2. |
| ABS(numeric) | abs(numeric) | Returns the absolute value of numeric. |
-| CEIL(numeric) | ceil(numeric) | Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. |
+| CEIL(numeric)
CEILING(numeric) | ceil(numeric) | Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. |
| FLOOR(numeric) | floor(numeric) | Rounds numeric down, and returns the largest number that is less than or equal to numeric. |
| ROUND(numeric, int) | round(numeric) | Returns a number rounded to INT decimal places for NUMERIC. |
| UUID() | uuid() | Returns an UUID (Universally Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly generated) UUID. |
diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md
index b04e1d7635c..26a2ec6ea78 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -119,18 +119,18 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [
## Arithmetic Functions
-| Function | Janino Code | Description |
-|----------------------|-----------------------------|-----------------------------------------------------------------|
-| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1 plus NUMERIC2. |
-| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1 minus NUMERIC2. |
-| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1 multiplied by NUMERIC2. |
-| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by NUMERIC2. |
-| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus) of numeric1 divided by numeric2. |
-| ABS(numeric) | abs(numeric) | Returns the absolute value of numeric. |
-| CEIL(numeric) | ceil(numeric) | Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. |
-| FLOOR(numeric) | floor(numeric) | Rounds numeric down, and returns the largest number that is less than or equal to numeric. |
-| ROUND(numeric, int) | round(numeric) | Returns a number rounded to INT decimal places for NUMERIC. |
-| UUID() | uuid() | Returns an UUID (Universally Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly generated) UUID. |
+| Function | Janino Code | Description |
+|------------------------------------|-----------------------------|-----------------------------------------------------------------|
+| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1 plus NUMERIC2. |
+| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1 minus NUMERIC2. |
+| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1 multiplied by NUMERIC2. |
+| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by NUMERIC2. |
+| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus) of numeric1 divided by numeric2. |
+| ABS(numeric) | abs(numeric) | Returns the absolute value of numeric. |
+| CEIL(numeric)
CEILING(numeric) | ceil(numeric) | Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. |
+| FLOOR(numeric) | floor(numeric) | Rounds numeric down, and returns the largest number that is less than or equal to numeric. |
+| ROUND(numeric, int) | round(numeric) | Returns a number rounded to INT decimal places for NUMERIC. |
+| UUID() | uuid() | Returns an UUID (Universally Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly generated) UUID. |
## String Functions
diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 2ac394add49..4426374b6ab 100644
--- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -766,7 +766,7 @@ void testBuiltinArithmeticFunctions(ValuesDataSink.SinkApi sinkApi) throws Excep
+ "CAST(id AS INT) % 3 AS col5, ABS(id - 17) AS col6, "
+ "CEIL(CAST(id AS DOUBLE) / 1.7) AS col7, "
+ "FLOOR(CAST(id AS DOUBLE) / 1.7) AS col8, "
- + "ROUND(CAST(id AS DOUBLE) / 1.7) AS col9, "
+ + "ROUND(CAST(id AS DOUBLE) / 1.7, 0) AS col9, "
+ "CHAR_LENGTH(UUID()) AS col10",
null,
null,
@@ -2184,6 +2184,291 @@ void testTransformWithLargeLiterals() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}");
}
+ @Test
+ void testFloorCeilAndRoundFunction() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
+ List events = generateFloorCeilAndRoundEvents(tableId);
+
+ ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+ "\\.*.\\.*.\\.*",
+ "*, "
+ + "CEIL(tinyint_col) AS ceil_tinyint,"
+ + "CEIL(smallint_col) AS ceil_smallint,"
+ + "CEIL(int_col) AS ceil_int,"
+ + "CEIL(bigint_col) AS ceil_bigint,"
+ + "CEIL(float_col) AS ceil_float,"
+ + "CEIL(double_col) AS ceil_double,"
+ + "CEIL(decimal_col) AS ceil_decimal,"
+ + "CEILING(tinyint_col) AS ceiling_tinyint,"
+ + "CEILING(smallint_col) AS ceiling_smallint,"
+ + "CEILING(int_col) AS ceiling_int,"
+ + "CEILING(bigint_col) AS ceiling_bigint,"
+ + "CEILING(float_col) AS ceiling_float,"
+ + "CEILING(double_col) AS ceiling_double,"
+ + "CEILING(decimal_col) AS ceiling_decimal,"
+ + "FLOOR(tinyint_col) AS floor_tinyint,"
+ + "FLOOR(smallint_col) AS floor_smallint,"
+ + "FLOOR(int_col) AS floor_int,"
+ + "FLOOR(bigint_col) AS floor_bigint,"
+ + "FLOOR(float_col) AS floor_float,"
+ + "FLOOR(double_col) AS floor_double,"
+ + "FLOOR(decimal_col) AS floor_decimal,"
+ + "ROUND(tinyint_col, 2) AS round_tinyint,"
+ + "ROUND(smallint_col, 2) AS round_smallint,"
+ + "ROUND(int_col, 2) AS round_int,"
+ + "ROUND(bigint_col, 2) AS round_bigint,"
+ + "ROUND(float_col, 2) AS round_float,"
+ + "ROUND(double_col, 2) AS round_double,"
+ + "ROUND(decimal_col, 2) AS round_decimal,"
+ + "ROUND(tinyint_col, 0) AS round_0_tinyint,"
+ + "ROUND(smallint_col, 0) AS round_0_smallint,"
+ + "ROUND(int_col, 0) AS round_0_int,"
+ + "ROUND(bigint_col, 0) AS round_0_bigint,"
+ + "ROUND(float_col, 0) AS round_0_float,"
+ + "ROUND(double_col, 0) AS round_0_double,"
+ + "ROUND(decimal_col, 0) AS round_0_decimal",
+ null,
+ "id",
+ null,
+ null,
+ null,
+ null)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+ assertThat(outputEvents)
+ .containsExactly(
+ "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`tinyint_col` TINYINT,`smallint_col` SMALLINT,`int_col` INT,`bigint_col` BIGINT,`float_col` FLOAT,`double_col` DOUBLE,`decimal_col` DECIMAL(10, 3),`ceil_tinyint` TINYINT,`ceil_smallint` SMALLINT,`ceil_int` INT,`ceil_bigint` BIGINT,`ceil_float` FLOAT,`ceil_double` DOUBLE,`ceil_decimal` DECIMAL(10, 0),`ceiling_tinyint` TINYINT,`ceiling_smallint` SMALLINT,`ceiling_int` INT,`ceiling_bigint` BIGINT,`ceiling_float` FLOAT,`ceiling_double` DOUBLE,`ceiling_decimal` DECIMAL(10, 0),`floor_tinyint` TINYINT,`floor_smallint` SMALLINT,`floor_int` INT,`floor_bigint` BIGINT,`floor_float` FLOAT,`floor_double` DOUBLE,`floor_decimal` DECIMAL(10, 0),`round_tinyint` TINYINT,`round_smallint` SMALLINT,`round_int` INT,`round_bigint` BIGINT,`round_float` FLOAT,`round_double` DOUBLE,`round_decimal` DECIMAL(10, 2),`round_0_tinyint` TINYINT,`round_0_smallint` SMALLINT,`round_0_int` INT,`round_0_bigint` BIGINT,`round_0_float` FLOAT,`round_0_double` DOUBLE,`round_0_decimal` DECIMAL(8, 0)}, primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, 1, 1, 1, 1, 1.1, 1.1, 1.100, 1, 1, 1, 1, 2.0, 2.0, 2, 1, 1, 1, 1, 2.0, 2.0, 2, 1, 1, 1, 1, 1.0, 1.0, 1, 1, 1, 1, 1, 1.1, 1.1, 1.10, 1, 1, 1, 1, 1.0, 1.0, 1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4, 4, 4, 4, 4, 4.44, 4.44, 4.440, 4, 4, 4, 4, 5.0, 5.0, 5, 4, 4, 4, 4, 5.0, 5.0, 5, 4, 4, 4, 4, 4.0, 4.0, 4, 4, 4, 4, 4, 4.44, 4.44, 4.44, 4, 4, 4, 4, 4.0, 4.0, 4], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5, 5, 5, 5, 5, 5.555, 5.555, 5.555, 5, 5, 5, 5, 6.0, 6.0, 6, 5, 5, 5, 5, 6.0, 6.0, 6, 5, 5, 5, 5, 5.0, 5.0, 5, 5, 5, 5, 5, 5.56, 5.56, 5.56, 5, 5, 5, 5, 6.0, 6.0, 6], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9, 9, 9, 9, 9, 1.0E7, 9999999.999, 9999999.999, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000, 9, 9, 9, 9, 1.0E7, 9999999.0, 9999999, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000.00, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[0, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
+ }
+
+ @Test
+ void testAbsFunction() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
+ List events = generateAbsEvents(tableId);
+
+ ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+ "\\.*.\\.*.\\.*",
+ "*, "
+ + "ABS(tinyint_col) AS abs_tinyint,"
+ + "ABS(smallint_col) AS abs_smallint,"
+ + "ABS(int_col) AS abs_int,"
+ + "ABS(bigint_col) AS abs_bigint,"
+ + "ABS(float_col) AS abs_float,"
+ + "ABS(double_col) AS abs_double,"
+ + "ABS(decimal_col) AS abs_decimal",
+ null,
+ "id",
+ null,
+ null,
+ null,
+ null)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+ assertThat(outputEvents)
+ .containsExactly(
+ "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`tinyint_col` TINYINT,`smallint_col` SMALLINT,`int_col` INT,`bigint_col` BIGINT,`float_col` FLOAT,`double_col` DOUBLE,`decimal_col` DECIMAL(10, 2),`abs_tinyint` TINYINT,`abs_smallint` SMALLINT,`abs_int` INT,`abs_bigint` BIGINT,`abs_float` FLOAT,`abs_double` DOUBLE,`abs_decimal` DECIMAL(10, 2)}, primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, 1, 1, 1, 1, 1.1, 1.1, 1.10, 1, 1, 1, 1, 1.1, 1.1, 1.10], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[-4, -4, -4, -4, -4, -4.44, -4.44, -4.44, 4, 4, 4, 4, 4.44, 4.44, 4.44], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[-9, -9, -9, -9, -9, -1.0E8, -9.999999999E7, -99999999.99, 9, 9, 9, 9, 1.0E8, 9.999999999E7, 99999999.99], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[0, null, null, null, null, null, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
+ }
+
+ private List generateFloorCeilAndRoundEvents(TableId tableId) {
+ List events = new ArrayList<>();
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("tinyint_col", DataTypes.TINYINT())
+ .physicalColumn("smallint_col", DataTypes.SMALLINT())
+ .physicalColumn("int_col", DataTypes.INT())
+ .physicalColumn("bigint_col", DataTypes.BIGINT())
+ .physicalColumn("float_col", DataTypes.FLOAT())
+ .physicalColumn("double_col", DataTypes.DOUBLE())
+ .physicalColumn("decimal_col", DataTypes.DECIMAL(10, 3))
+ .primaryKey("id")
+ .build();
+
+ events.add(new CreateTableEvent(tableId, schema));
+
+ Stream.of(
+ generate(
+ schema,
+ 1,
+ (byte) 1,
+ (short) 1,
+ 1,
+ 1L,
+ 1.1f,
+ 1.1d,
+ DecimalData.fromBigDecimal(new BigDecimal("1.1"), 10, 3)),
+ generate(
+ schema,
+ 4,
+ (byte) 4,
+ (short) 4,
+ 4,
+ 4L,
+ 4.44f,
+ 4.44d,
+ DecimalData.fromBigDecimal(new BigDecimal("4.44"), 10, 3)),
+ generate(
+ schema,
+ 5,
+ (byte) 5,
+ (short) 5,
+ 5,
+ 5L,
+ 5.555f,
+ 5.555d,
+ DecimalData.fromBigDecimal(new BigDecimal("5.555"), 10, 3)),
+ generate(
+ schema,
+ 9,
+ (byte) 9,
+ (short) 9,
+ 9,
+ 9L,
+ 9999999.999f,
+ 9999999.999d,
+ DecimalData.fromBigDecimal(new BigDecimal("9999999.999"), 10, 3)),
+ generate(schema, 0, null, null, null, null, null, null, null))
+ .map(rec -> DataChangeEvent.insertEvent(tableId, rec))
+ .forEach(events::add);
+ return events;
+ }
+
+ private List generateAbsEvents(TableId tableId) {
+ List events = new ArrayList<>();
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("tinyint_col", DataTypes.TINYINT())
+ .physicalColumn("smallint_col", DataTypes.SMALLINT())
+ .physicalColumn("int_col", DataTypes.INT())
+ .physicalColumn("bigint_col", DataTypes.BIGINT())
+ .physicalColumn("float_col", DataTypes.FLOAT())
+ .physicalColumn("double_col", DataTypes.DOUBLE())
+ .physicalColumn("decimal_col", DataTypes.DECIMAL(10, 2))
+ .primaryKey("id")
+ .build();
+
+ events.add(new CreateTableEvent(tableId, schema));
+
+ Stream.of(
+ generate(
+ schema,
+ 1,
+ (byte) 1,
+ (short) 1,
+ 1,
+ 1L,
+ 1.1f,
+ 1.1d,
+ DecimalData.fromBigDecimal(new BigDecimal("1.1"), 10, 2)),
+ generate(
+ schema,
+ -4,
+ (byte) -4,
+ (short) -4,
+ -4,
+ -4L,
+ -4.44f,
+ -4.44d,
+ DecimalData.fromBigDecimal(new BigDecimal("-4.44"), 10, 2)),
+ generate(
+ schema,
+ -9,
+ (byte) -9,
+ (short) -9,
+ -9,
+ -9L,
+ -99999999.99f,
+ -99999999.99d,
+ DecimalData.fromBigDecimal(new BigDecimal("-99999999.99"), 10, 2)),
+ generate(schema, 0, null, null, null, null, null, null, null))
+ .map(rec -> DataChangeEvent.insertEvent(tableId, rec))
+ .forEach(events::add);
+ return events;
+ }
+
private List generateSchemaEvolutionEvents(TableId tableId) {
List events = new ArrayList<>();
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 7fc7482e6ab..ac91e7f2459 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -518,168 +518,214 @@ public static String lower(String str) {
}
/** SQL ABS
operator applied to byte values. */
- public static byte abs(byte b0) {
- return (byte) Math.abs(b0);
+ public static Byte abs(Byte value) {
+ if (value == null) {
+ return null;
+ }
+ return (byte) Math.abs(value);
}
/** SQL ABS
operator applied to short values. */
- public static short abs(short b0) {
- return (short) Math.abs(b0);
+ public static Short abs(Short value) {
+ if (value == null) {
+ return null;
+ }
+ return (short) Math.abs(value);
}
/** SQL ABS
operator applied to int values. */
- public static int abs(int b0) {
- return Math.abs(b0);
+ public static Integer abs(Integer value) {
+ if (value == null) {
+ return null;
+ }
+ return Math.abs(value);
}
/** SQL ABS
operator applied to long values. */
- public static long abs(long b0) {
- return Math.abs(b0);
+ public static Long abs(Long value) {
+ if (value == null) {
+ return null;
+ }
+ return Math.abs(value);
}
/** SQL ABS
operator applied to float values. */
- public static float abs(float b0) {
- return Math.abs(b0);
+ public static Float abs(Float value) {
+ if (value == null) {
+ return null;
+ }
+ return Math.abs(value);
}
/** SQL ABS
operator applied to double values. */
- public static double abs(double b0) {
- return Math.abs(b0);
+ public static Double abs(Double value) {
+ if (value == null) {
+ return null;
+ }
+ return Math.abs(value);
}
- public static double floor(double b0) {
- return Math.floor(b0);
+ /** SQL ABS
operator applied to decimal values. */
+ public static DecimalData abs(DecimalData value) {
+ if (value == null) {
+ return null;
+ }
+ return DecimalData.fromBigDecimal(
+ BigDecimal.valueOf(Math.abs(value.toBigDecimal().doubleValue())),
+ value.precision(),
+ value.scale());
}
- public static float floor(float b0) {
- return (float) Math.floor(b0);
+ public static Byte floor(Byte value) {
+ return value;
}
- /** SQL FLOOR
operator applied to int values. */
- public static int floor(int b0, int b1) {
- int r = b0 % b1;
- if (r < 0) {
- r += b1;
- }
- return b0 - r;
+ public static Short floor(Short value) {
+ return value;
}
- /** SQL FLOOR
operator applied to long values. */
- public static long floor(long b0, long b1) {
- long r = b0 % b1;
- if (r < 0) {
- r += b1;
- }
- return b0 - r;
+ public static Integer floor(Integer value) {
+ return value;
}
- public static double ceil(double b0) {
- return Math.ceil(b0);
+ public static Long floor(Long value) {
+ return value;
}
- public static float ceil(float b0) {
- return (float) Math.ceil(b0);
+ public static Double floor(Double value) {
+ if (value == null) {
+ return null;
+ }
+ return Math.floor(value);
}
- /** SQL CEIL
operator applied to int values. */
- public static int ceil(int b0, int b1) {
- int r = b0 % b1;
- if (r > 0) {
- r -= b1;
+ public static Float floor(Float value) {
+ if (value == null) {
+ return null;
}
- return b0 - r;
+ return (float) Math.floor(value);
}
- /** SQL CEIL
operator applied to long values. */
- public static long ceil(long b0, long b1) {
- return floor(b0 + b1 - 1, b1);
+ public static DecimalData floor(DecimalData value) {
+ if (value == null) {
+ return null;
+ }
+ return DecimalData.fromBigDecimal(
+ BigDecimal.valueOf(Math.floor(value.toBigDecimal().doubleValue())),
+ value.precision(),
+ 0);
}
- // SQL ROUND
- /** SQL ROUND
operator applied to byte values. */
- public static byte round(byte b0) {
- return round(b0, 0);
+ public static Byte ceil(Byte value) {
+ return value;
}
- /** SQL ROUND
operator applied to byte values. */
- public static byte round(byte b0, int b1) {
- return round(BigDecimal.valueOf(b0), b1).byteValue();
+ public static Short ceil(Short value) {
+ return value;
}
- /** SQL ROUND
operator applied to short values. */
- public static short round(short b0) {
- return round(b0, 0);
+ public static Integer ceil(Integer value) {
+ return value;
}
- /** SQL ROUND
operator applied to short values. */
- public static short round(short b0, int b1) {
- return round(BigDecimal.valueOf(b0), b1).shortValue();
+ public static Long ceil(Long value) {
+ return value;
}
- /** SQL ROUND
operator applied to int values. */
- public static int round(int b0) {
- return round(b0, 0);
+ public static Double ceil(Double value) {
+ if (value == null) {
+ return null;
+ }
+ return Math.ceil(value);
}
- /** SQL ROUND
operator applied to int values. */
- public static int round(int b0, int b1) {
- return round(BigDecimal.valueOf(b0), b1).intValue();
+ public static Float ceil(Float value) {
+ if (value == null) {
+ return null;
+ }
+ return (float) Math.ceil(value);
}
- /** SQL ROUND
operator applied to long values. */
- public static long round(long b0) {
- return round(b0, 0);
+ public static DecimalData ceil(DecimalData value) {
+ if (value == null) {
+ return null;
+ }
+ return DecimalData.fromBigDecimal(
+ BigDecimal.valueOf(Math.ceil(value.toBigDecimal().doubleValue())),
+ value.precision(),
+ 0);
}
- /** SQL ROUND
operator applied to long values. */
- public static long round(long b0, int b1) {
- return round(BigDecimal.valueOf(b0), b1).longValue();
+ // SQL ROUND
+ /** SQL ROUND
operator applied to byte values. */
+ public static Byte round(Byte value, int pointOffset) {
+ if (value == null) {
+ return null;
+ }
+ return round(BigDecimal.valueOf(value), pointOffset).byteValue();
}
- /** SQL ROUND
operator applied to BigDecimal values. */
- public static BigDecimal round(BigDecimal b0) {
- return round(b0, 0);
+ /** SQL ROUND
operator applied to short values. */
+ public static Short round(Short value, int pointOffset) {
+ if (value == null) {
+ return null;
+ }
+ return round(BigDecimal.valueOf(value), pointOffset).shortValue();
}
- /** SQL ROUND
operator applied to BigDecimal values. */
- public static DecimalData round(DecimalData b0) {
- return round(b0, 0);
+ /** SQL ROUND
operator applied to int values. */
+ public static Integer round(Integer value, int pointOffset) {
+ if (value == null) {
+ return null;
+ }
+ return round(BigDecimal.valueOf(value), pointOffset).intValue();
}
- /** SQL ROUND
operator applied to BigDecimal values. */
- public static BigDecimal round(BigDecimal b0, int b1) {
- return b0.movePointRight(b1).setScale(0, RoundingMode.HALF_UP).movePointLeft(b1);
+ /** SQL ROUND
operator applied to long values. */
+ public static Long round(Long value, int pointOffset) {
+ if (value == null) {
+ return null;
+ }
+ return round(BigDecimal.valueOf(value), pointOffset).longValue();
}
/** SQL ROUND
operator applied to DecimalData values. */
- public static DecimalData round(DecimalData b0, int b1) {
+ public static DecimalData round(DecimalData value, int pointOffset) {
+ if (value == null) {
+ return null;
+ }
return DecimalData.fromBigDecimal(
- b0.toBigDecimal()
- .movePointRight(b1)
+ value.toBigDecimal()
+ .movePointRight(pointOffset)
.setScale(0, RoundingMode.HALF_UP)
- .movePointLeft(b1),
- b0.precision(),
- b0.scale());
- }
-
- /** SQL ROUND
operator applied to float values. */
- public static float round(float b0) {
- return round(b0, 0);
+ .movePointLeft(pointOffset),
+ value.precision(),
+ pointOffset);
}
/** SQL ROUND
operator applied to float values. */
- public static float round(float b0, int b1) {
- return round(BigDecimal.valueOf(b0), b1).floatValue();
+ public static Float round(Float value, int pointOffset) {
+ if (value == null) {
+ return null;
+ }
+ return round(new BigDecimal(value.toString()), pointOffset).floatValue();
}
/** SQL ROUND
operator applied to double values. */
- public static double round(double b0) {
- return round(b0, 0);
+ public static Double round(Double value, int pointOffset) {
+ if (value == null) {
+ return null;
+ }
+ return round(BigDecimal.valueOf(value), pointOffset).doubleValue();
}
- /** SQL ROUND
operator applied to double values. */
- public static double round(double b0, int b1) {
- return round(BigDecimal.valueOf(b0), b1).doubleValue();
+ private static BigDecimal round(BigDecimal value, int pointOffset) {
+ if (value == null) {
+ return null;
+ }
+ return value.movePointRight(pointOffset)
+ .setScale(0, RoundingMode.HALF_UP)
+ .movePointLeft(pointOffset);
}
public static String uuid() {
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
index 8a7b4fa0477..dd75c2d12fa 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
@@ -145,8 +145,6 @@ public void writeBinary(int pos, byte[] bytes) {
@Override
public void writeDecimal(int pos, DecimalData value, int precision) {
- assert value == null || (value.precision() <= precision);
-
if (DecimalData.isCompact(precision)) {
assert value != null;
writeLong(pos, value.toUnscaledLong());
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 11ae3ee5a46..07ae5369b50 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -2967,8 +2967,9 @@ void testBuildInFunctionTransform() throws Exception {
testExpressionConditionTransform("'123' not like '^[a-zA-Z]'");
testExpressionConditionTransform("abs(2) = 2");
testExpressionConditionTransform("ceil(2.4) = 3.0");
+ testExpressionConditionTransform("ceiling(2.4) = 3.0");
testExpressionConditionTransform("floor(2.5) = 2.0");
- testExpressionConditionTransform("round(3.1415926,2) = 3.14");
+ testExpressionConditionTransform("round(3.1415926, 2) = 3.14");
testExpressionConditionTransform("IF(2>0,1,0) = 1");
testExpressionConditionTransform("COALESCE(null,1,2) = 1");
testExpressionConditionTransform("1 + 1 = 2");
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 45ce576ec96..9f6e0ecdf96 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -228,6 +228,7 @@ public void testTranslateFilterToJaninoExpression() {
testFilterExpression("id not like '^[a-zA-Z]'", "notLike(id, \"^[a-zA-Z]\")");
testFilterExpression("abs(2)", "abs(2)");
testFilterExpression("ceil(2)", "ceil(2)");
+ testFilterExpression("ceiling(2)", "ceil(2)");
testFilterExpression("floor(2)", "floor(2)");
testFilterExpression("round(2,2)", "round(2, 2)");
testFilterExpression("uuid()", "uuid()");
@@ -630,6 +631,8 @@ public void testTranslateUdfFilterToJaninoExpression() {
"typeof(abs(2))", "__instanceOfTypeOfFunctionClass.eval(abs(2))");
testFilterExpressionWithUdf(
"typeof(ceil(2))", "__instanceOfTypeOfFunctionClass.eval(ceil(2))");
+ testFilterExpressionWithUdf(
+ "typeof(ceiling(2))", "__instanceOfTypeOfFunctionClass.eval(ceil(2))");
testFilterExpressionWithUdf(
"typeof(floor(2))", "__instanceOfTypeOfFunctionClass.eval(floor(2))");
testFilterExpressionWithUdf(