From b567ebcd721e2ee7bcfebb473b6a0087974c075d Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 20 Dec 2024 22:10:08 +0900 Subject: [PATCH] Fix failure when equality delete updated nested fields in Iceberg --- .../plugin/iceberg/IcebergPageSource.java | 30 ++++++++++++++----- .../iceberg/IcebergPageSourceProvider.java | 24 ++++++++++----- .../io/trino/plugin/iceberg/IcebergUtil.java | 15 +++++++--- .../plugin/iceberg/delete/RowPredicate.java | 5 ++-- .../trino/plugin/iceberg/TestIcebergV2.java | 4 +-- 5 files changed, 55 insertions(+), 23 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java index ec5bd9e55d85..0e465ba6e940 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSource.java @@ -26,12 +26,12 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Supplier; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Throwables.throwIfInstanceOf; import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; @@ -41,6 +41,7 @@ public class IcebergPageSource implements ConnectorPageSource { private final int[] expectedColumnIndexes; + private final int[] deleteFilterIndexes; private final ConnectorPageSource delegate; private final Optional projectionsAdapter; private final Supplier> deletePredicate; @@ -53,6 +54,8 @@ public class IcebergPageSource public IcebergPageSource( List expectedColumns, List requiredColumns, + List deleteFilterColumns, + OptionalInt rowPositionColumnIndex, ConnectorPageSource delegate, Optional projectionsAdapter, Supplier> deletePredicate, @@ -62,15 +65,26 @@ public IcebergPageSource( // requiredColumns should include all expectedColumns as well as any columns needed by the DeleteFilter requireNonNull(expectedColumns, "expectedColumns is null"); requireNonNull(requiredColumns, "requiredColumns is null"); + requireNonNull(deleteFilterColumns, "deleteFilterColumns is null"); + requireNonNull(rowPositionColumnIndex, "rowPositionColumnIndex is null"); this.expectedColumnIndexes = new int[expectedColumns.size()]; - for (int i = 0; i < expectedColumns.size(); i++) { - IcebergColumnHandle expectedColumn = expectedColumns.get(i); - checkArgument(expectedColumn.equals(requiredColumns.get(i)), "Expected columns must be a prefix of required columns"); - expectedColumnIndexes[i] = i; - - if (expectedColumn.isMergeRowIdColumn()) { + this.deleteFilterIndexes = new int[deleteFilterColumns.size()]; + for (int i = 0; i < requiredColumns.size(); i++) { + IcebergColumnHandle column = requiredColumns.get(i); + if (i < expectedColumns.size()) { + expectedColumnIndexes[i] = i; + } + if (column.isMergeRowIdColumn()) { this.rowIdColumnIndex = i; } + if (deleteFilterColumns.contains(column)) { + if (column.isRowPositionColumn()) { + this.deleteFilterIndexes[deleteFilterColumns.indexOf(column)] = rowPositionColumnIndex.orElseThrow(); + } + else { + this.deleteFilterIndexes[deleteFilterColumns.indexOf(column)] = i; + } + } } this.delegate = requireNonNull(delegate, "delegate is null"); @@ -120,7 +134,7 @@ public Page getNextPage() Optional deleteFilterPredicate = deletePredicate.get(); if (deleteFilterPredicate.isPresent()) { - dataPage = deleteFilterPredicate.get().filterPage(dataPage); + dataPage = deleteFilterPredicate.get().filterPage(dataPage, deleteFilterIndexes); } if (projectionsAdapter.isPresent()) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 4b5abd936d98..fd716bc38463 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -113,6 +113,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -184,7 +185,6 @@ import static java.util.function.Predicate.not; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; -import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; import static org.apache.iceberg.FileContent.EQUALITY_DELETES; import static org.apache.iceberg.FileContent.POSITION_DELETES; @@ -349,16 +349,12 @@ public ConnectorPageSource createPageSource( column -> ((IcebergColumnHandle) column).getType(), IcebergPageSourceProvider::applyProjection)); - List readColumns = dataPageSource.getReaderColumns() - .map(readerColumns -> readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toList())) - .orElse(requiredColumns); - Supplier> deletePredicate = memoize(() -> getDeleteManager(partitionSpec, partitionData) .getDeletePredicate( path, dataSequenceNumber, deletes, - readColumns, + deleteFilterRequiredColumns.stream().collect(toImmutableList()), tableSchema, readerPageSourceWithRowPositions, (deleteFile, deleteColumns, tupleDomain) -> openDeletes(session, fileSystem, deleteFile, deleteColumns, tupleDomain))); @@ -366,6 +362,8 @@ public ConnectorPageSource createPageSource( return new IcebergPageSource( icebergColumns, requiredColumns, + deleteFilterRequiredColumns.stream().collect(toImmutableList()), + readerPageSourceWithRowPositions.rowPositionColumnIndex, dataPageSource.get(), projectionsAdapter, deletePredicate, @@ -618,6 +616,7 @@ private static ReaderPageSourceWithRowPositions createOrcPageSource( List projectedLayouts = new ArrayList<>(readBaseColumns.size()); List columnAdaptations = new ArrayList<>(readBaseColumns.size()); + OptionalInt rowPositionColumnIndex = OptionalInt.empty(); for (IcebergColumnHandle column : readBaseColumns) { verify(column.isBaseColumn(), "Column projections must be based from a root column"); OrcColumn orcColumn = fileColumnsByIcebergId.get(column.getId()); @@ -643,6 +642,7 @@ else if (column.isMergeRowIdColumn()) { } else if (column.isRowPositionColumn()) { columnAdaptations.add(ColumnAdaptation.positionColumn()); + rowPositionColumnIndex = OptionalInt.of(readBaseColumns.indexOf(column)); } else if (orcColumn != null) { Type readType = getOrcReadType(column.getType(), typeManager); @@ -700,6 +700,7 @@ else if (orcColumn != null) { stats, reader.getCompressionKind()), baseColumnProjections), + rowPositionColumnIndex, recordReader.getStartRowPosition(), recordReader.getEndRowPosition()); } @@ -911,6 +912,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( ParquetPageSource.Builder pageSourceBuilder = ParquetPageSource.builder(); int parquetSourceChannel = 0; + OptionalInt rowPositionColumnIndex = OptionalInt.empty(); ImmutableList.Builder parquetColumnFieldsBuilder = ImmutableList.builder(); for (int columnIndex = 0; columnIndex < readBaseColumns.size(); columnIndex++) { IcebergColumnHandle column = readBaseColumns.get(columnIndex); @@ -935,6 +937,7 @@ else if (column.isMergeRowIdColumn()) { } else if (column.isRowPositionColumn()) { pageSourceBuilder.addRowIndexColumn(); + rowPositionColumnIndex = OptionalInt.of(columnIndex); } else { org.apache.parquet.schema.Type parquetField = parquetFields.get(columnIndex); @@ -972,6 +975,7 @@ else if (column.isRowPositionColumn()) { new ReaderPageSource( pageSourceBuilder.build(parquetReader), baseColumnProjections), + rowPositionColumnIndex, startRowPosition, endRowPosition); } @@ -1076,6 +1080,7 @@ private static ReaderPageSourceWithRowPositions createAvroPageSource( Map fileColumnsByIcebergId = mapIdsToAvroFields(fileFields); + OptionalInt rowPositionColumnIndex = OptionalInt.empty(); ImmutableList.Builder columnNames = ImmutableList.builder(); ImmutableList.Builder columnTypes = ImmutableList.builder(); ImmutableList.Builder rowIndexChannels = ImmutableList.builder(); @@ -1097,6 +1102,9 @@ else if (column.isMergeRowIdColumn() || column.isRowPositionColumn()) { columnNames.add(ROW_POSITION.name()); columnTypes.add(BIGINT); constantPopulatingPageSourceBuilder.addDelegateColumn(avroSourceChannel); + if (column.isRowPositionColumn()) { + rowPositionColumnIndex = OptionalInt.of(readBaseColumns.indexOf(column)); + } avroSourceChannel++; } else if (field == null) { @@ -1124,6 +1132,7 @@ else if (field == null) { rowIndexChannels.build(), newSimpleAggregatedMemoryContext())), baseColumnProjections), + rowPositionColumnIndex, Optional.empty(), Optional.empty()); } @@ -1394,11 +1403,12 @@ private static TrinoException handleException(ParquetDataSourceId dataSourceId, return new TrinoException(ICEBERG_CURSOR_ERROR, format("Failed to read Parquet file: %s", dataSourceId), exception); } - public record ReaderPageSourceWithRowPositions(ReaderPageSource readerPageSource, Optional startRowPosition, Optional endRowPosition) + public record ReaderPageSourceWithRowPositions(ReaderPageSource readerPageSource, OptionalInt rowPositionColumnIndex, Optional startRowPosition, Optional endRowPosition) { public ReaderPageSourceWithRowPositions { requireNonNull(readerPageSource, "readerPageSource is null"); + requireNonNull(rowPositionColumnIndex, "rowPositionColumnIndex is null"); requireNonNull(startRowPosition, "startRowPosition is null"); requireNonNull(endRowPosition, "endRowPosition is null"); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index ecd82d188260..79cef54cd372 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -432,10 +432,17 @@ private static IcebergColumnHandle createColumnHandle(NestedField baseColumn, Ne public static Schema schemaFromHandles(List columns) { - List icebergColumns = columns.stream() - .map(column -> NestedField.optional(column.getId(), column.getName(), toIcebergType(column.getType(), column.getColumnIdentity()))) - .collect(toImmutableList()); - return new Schema(StructType.of(icebergColumns).asStructType().fields()); + Schema schema = new Schema(); + List icebergColumns = new ArrayList<>(); + for (IcebergColumnHandle column : columns) { + NestedField field = NestedField.optional(column.getId(), column.getName(), toIcebergType(column.getType(), column.getColumnIdentity())); + // Schema disallows duplicate fields + if (schema.findField(field.fieldId()) == null) { + icebergColumns.add(field); + schema = new Schema(StructType.of(icebergColumns).asStructType().fields()); + } + } + return schema; } public static Map getIdentityPartitions(PartitionSpec partitionSpec) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java index 8ce6bbd8773a..5d78fdd04fcc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java @@ -29,13 +29,14 @@ default RowPredicate and(RowPredicate other) return (page, position) -> test(page, position) && other.test(page, position); } - default Page filterPage(Page page) + default Page filterPage(Page page, int[] columns) { int positionCount = page.getPositionCount(); int[] retained = new int[positionCount]; int retainedCount = 0; + Page selectedPage = columns.length == 0 ? page : page.getColumns(columns); for (int position = 0; position < positionCount; position++) { - if (test(page, position)) { + if (test(selectedPage, position)) { retained[retainedCount] = position; retainedCount++; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 67aa3ab38a62..f81514eada1b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -597,8 +597,8 @@ public void testMultipleEqualityDeletesWithNestedFields() equalityFieldIds); } - // TODO: support read equality deletes with nested fields(https://github.com/trinodb/trino/issues/18625) - assertThat(query("SELECT * FROM " + tableName)).failure().hasMessageContaining("Multiple entries with same key"); + assertThat(query("SELECT * FROM " + tableName)) + .matches("VALUES (BIGINT '1', CAST(row(10, 100) AS ROW(nested BIGINT, nested_other BIGINT)))"); assertUpdate("DROP TABLE " + tableName); }