Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #11702 to FLink1.19 and 1.18 #12080

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down Expand Up @@ -656,8 +657,17 @@ private DataStream<RowData> distributeDataStream(

return shuffleStream
.partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r)
.filter(StatisticsOrRecord::hasRecord)
.map(StatisticsOrRecord::record);
.flatMap(
(FlatMapFunction<StatisticsOrRecord, RowData>)
(statisticsOrRecord, out) -> {
if (statisticsOrRecord.hasRecord()) {
out.collect(statisticsOrRecord.record());
}
})
// Set the parallelism same as writerParallelism to
// promote operator chaining with the downstream writer operator
.setParallelism(writerParallelism)
.returns(RowData.class);

default:
throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,18 @@ public class TestFlinkIcebergSinkDistributionMode extends TestFlinkIcebergSinkBa
@Parameter(index = 1)
private boolean partitioned;

@Parameters(name = "parallelism = {0}, partitioned = {1}")
@Parameter(index = 2)
private int writeParallelism;

@Parameters(name = "parallelism = {0}, partitioned = {1}, writeParallelism = {2}")
public static Object[][] parameters() {
return new Object[][] {
{1, true},
{1, false},
{2, true},
{2, false}
{1, true, 1},
{1, false, 1},
{2, true, 2},
{2, false, 2},
{1, true, 2},
{1, false, 2},
};
}

Expand All @@ -110,7 +115,7 @@ public void before() throws IOException {
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
.setMaxParallelism(Math.max(parallelism, writeParallelism));

this.tableLoader = CATALOG_EXTENSION.tableLoader();
}
Expand Down Expand Up @@ -180,7 +185,7 @@ public void testOverrideWriteConfigWithUnknownDistributionMode() {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.setAll(newProps);

assertThatThrownBy(builder::append)
Expand All @@ -206,7 +211,7 @@ public void testRangeDistributionWithoutSortOrderUnpartitioned() throws Exceptio
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism);
.writeParallelism(writeParallelism);

// Range distribution requires either sort order or partition spec defined
assertThatThrownBy(builder::append)
Expand All @@ -233,7 +238,7 @@ public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism);
.writeParallelism(writeParallelism);

// sort based on partition columns
builder.append();
Expand Down Expand Up @@ -307,7 +312,7 @@ public void testRangeDistributionWithSortOrder() throws Exception {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.rangeDistributionStatisticsType(StatisticsType.Map)
.append();
env.execute(getClass().getSimpleName());
Expand Down Expand Up @@ -343,9 +348,9 @@ public void testRangeDistributionWithSortOrder() throws Exception {
List<DataFile> addedDataFiles =
Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
// each writer task should only write one file for non-partition sort column
assertThat(addedDataFiles).hasSize(parallelism);
assertThat(addedDataFiles).hasSize(writeParallelism);
// verify there is no overlap in min-max stats range
if (parallelism == 2) {
if (parallelism > 1) {
assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
}
}
Expand All @@ -368,7 +373,7 @@ public void testRangeDistributionSketchWithSortOrder() throws Exception {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.rangeDistributionStatisticsType(StatisticsType.Sketch)
.append();
env.execute(getClass().getSimpleName());
Expand Down Expand Up @@ -399,9 +404,9 @@ public void testRangeDistributionSketchWithSortOrder() throws Exception {
List<DataFile> addedDataFiles =
Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
// each writer task should only write one file for non-partition sort column
assertThat(addedDataFiles).hasSize(parallelism);
assertThat(addedDataFiles).hasSize(writeParallelism);
// verify there is no overlap in min-max stats range
if (parallelism == 2) {
if (writeParallelism > 2) {
assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
}
}
Expand Down Expand Up @@ -437,7 +442,7 @@ public void testRangeDistributionStatisticsMigration() throws Exception {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.rangeDistributionStatisticsType(StatisticsType.Auto)
.append();
env.execute(getClass().getSimpleName());
Expand Down Expand Up @@ -469,9 +474,9 @@ public void testRangeDistributionStatisticsMigration() throws Exception {
Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
// each writer task should only write one file for non-partition sort column
// sometimes
assertThat(addedDataFiles).hasSize(parallelism);
assertThat(addedDataFiles).hasSize(writeParallelism);
// verify there is no overlap in min-max stats range
if (parallelism == 2) {
if (writeParallelism > 1) {
assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down Expand Up @@ -655,8 +656,17 @@ private DataStream<RowData> distributeDataStream(

return shuffleStream
.partitionCustom(new RangePartitioner(iSchema, sortOrder), r -> r)
.filter(StatisticsOrRecord::hasRecord)
.map(StatisticsOrRecord::record);
.flatMap(
(FlatMapFunction<StatisticsOrRecord, RowData>)
(statisticsOrRecord, out) -> {
if (statisticsOrRecord.hasRecord()) {
out.collect(statisticsOrRecord.record());
}
})
// Set the parallelism same as writerParallelism to
// promote operator chaining with the downstream writer operator
.setParallelism(writerParallelism)
.returns(RowData.class);

default:
throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,18 @@ public class TestFlinkIcebergSinkDistributionMode extends TestFlinkIcebergSinkBa
@Parameter(index = 1)
private boolean partitioned;

@Parameters(name = "parallelism = {0}, partitioned = {1}")
@Parameter(index = 2)
private int writeParallelism;

@Parameters(name = "parallelism = {0}, partitioned = {1}, writeParallelism = {2}")
public static Object[][] parameters() {
return new Object[][] {
{1, true},
{1, false},
{2, true},
{2, false}
{1, true, 1},
{1, false, 1},
{2, true, 2},
{2, false, 2},
{1, true, 2},
{1, false, 2},
};
}

Expand All @@ -110,7 +115,7 @@ public void before() throws IOException {
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
.setMaxParallelism(Math.max(parallelism, writeParallelism));

this.tableLoader = CATALOG_EXTENSION.tableLoader();
}
Expand Down Expand Up @@ -180,7 +185,7 @@ public void testOverrideWriteConfigWithUnknownDistributionMode() {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.setAll(newProps);

assertThatThrownBy(builder::append)
Expand All @@ -206,7 +211,7 @@ public void testRangeDistributionWithoutSortOrderUnpartitioned() throws Exceptio
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism);
.writeParallelism(writeParallelism);

// Range distribution requires either sort order or partition spec defined
assertThatThrownBy(builder::append)
Expand All @@ -233,7 +238,7 @@ public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism);
.writeParallelism(writeParallelism);

// sort based on partition columns
builder.append();
Expand Down Expand Up @@ -307,7 +312,7 @@ public void testRangeDistributionWithSortOrder() throws Exception {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.rangeDistributionStatisticsType(StatisticsType.Map)
.append();
env.execute(getClass().getSimpleName());
Expand Down Expand Up @@ -343,9 +348,9 @@ public void testRangeDistributionWithSortOrder() throws Exception {
List<DataFile> addedDataFiles =
Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
// each writer task should only write one file for non-partition sort column
assertThat(addedDataFiles).hasSize(parallelism);
assertThat(addedDataFiles).hasSize(writeParallelism);
// verify there is no overlap in min-max stats range
if (parallelism == 2) {
if (parallelism > 1) {
assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
}
}
Expand All @@ -368,7 +373,7 @@ public void testRangeDistributionSketchWithSortOrder() throws Exception {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.rangeDistributionStatisticsType(StatisticsType.Sketch)
.append();
env.execute(getClass().getSimpleName());
Expand Down Expand Up @@ -399,9 +404,9 @@ public void testRangeDistributionSketchWithSortOrder() throws Exception {
List<DataFile> addedDataFiles =
Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
// each writer task should only write one file for non-partition sort column
assertThat(addedDataFiles).hasSize(parallelism);
assertThat(addedDataFiles).hasSize(writeParallelism);
// verify there is no overlap in min-max stats range
if (parallelism == 2) {
if (writeParallelism > 2) {
assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
}
}
Expand Down Expand Up @@ -437,7 +442,7 @@ public void testRangeDistributionStatisticsMigration() throws Exception {
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.writeParallelism(writeParallelism)
.rangeDistributionStatisticsType(StatisticsType.Auto)
.append();
env.execute(getClass().getSimpleName());
Expand Down Expand Up @@ -469,9 +474,9 @@ public void testRangeDistributionStatisticsMigration() throws Exception {
Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator());
// each writer task should only write one file for non-partition sort column
// sometimes
assertThat(addedDataFiles).hasSize(parallelism);
assertThat(addedDataFiles).hasSize(writeParallelism);
// verify there is no overlap in min-max stats range
if (parallelism == 2) {
if (writeParallelism > 1) {
assertIdColumnStatsNoRangeOverlap(addedDataFiles.get(0), addedDataFiles.get(1));
}
}
Expand Down