diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 2160d3c68005..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 5 + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "modification": 1 } diff --git a/CHANGES.md b/CHANGES.md index b8adeeb64362..66a10f69417c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -92,6 +92,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). * [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231)) +* [Managed Iceberg] Fixed a bug where DataFile metadata was assigned incorrect partition values ([#33549](https://github.com/apache/beam/pull/33549)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). @@ -138,6 +139,11 @@ * Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111)) * (Python) Fixed BigQuery Enrichment bug that can lead to multiple conditions returning duplicate rows, batching returning incorrect results and conditions not scoped by row during batching ([#32780](https://github.com/apache/beam/pull/32780)). +## Known Issues + +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 + # [2.60.0] - 2024-10-17 ## Highlights @@ -192,6 +198,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192)) * Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output. * Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results. * Fixed in 2.61.0. +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 # [2.59.0] - 2024-09-11 @@ -240,6 +248,8 @@ when running on 3.8. ([#31192](https://github.com/apache/beam/issues/31192)) * Duplicate Rows: Multiple conditions may be applied incorrectly, leading to the duplication of rows in the output. * Incorrect Results with Batched Requests: Conditions may not be correctly scoped to individual rows within the batch, potentially causing inaccurate results. * Fixed in 2.61.0. +* [Managed Iceberg] DataFile metadata is assigned incorrect partition values ([#33497](https://github.com/apache/beam/issues/33497)). + * Fixed in 2.62.0 # [2.58.1] - 2024-08-15 diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 4c21a0175ab0..63186f26fb5a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -96,7 +96,9 @@ class DestinationState { private final IcebergDestination icebergDestination; private final PartitionSpec spec; private final org.apache.iceberg.Schema schema; - private final PartitionKey partitionKey; + // used to determine the partition to which a record belongs + // must not be directly used to create a writer + private final PartitionKey routingPartitionKey; private final Table table; private final String stateToken = UUID.randomUUID().toString(); final Cache writers; @@ -109,7 +111,7 @@ class DestinationState { this.icebergDestination = icebergDestination; this.schema = table.schema(); this.spec = table.spec(); - this.partitionKey = new PartitionKey(spec, schema); + this.routingPartitionKey = new PartitionKey(spec, schema); this.table = table; for (PartitionField partitionField : spec.fields()) { partitionFieldMap.put(partitionField.name(), partitionField); @@ -154,12 +156,12 @@ class DestinationState { * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. */ boolean write(Record record) { - partitionKey.partition(getPartitionableRecord(record)); + routingPartitionKey.partition(getPartitionableRecord(record)); - if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { + if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) { return false; } - RecordWriter writer = fetchWriterForPartition(partitionKey); + RecordWriter writer = fetchWriterForPartition(routingPartitionKey); writer.write(record); return true; } @@ -173,10 +175,12 @@ private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) { RecordWriter recordWriter = writers.getIfPresent(partitionKey); if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) { + // each writer must have its own PartitionKey object + PartitionKey copy = partitionKey.copy(); // calling invalidate for a non-existent key is a safe operation - writers.invalidate(partitionKey); - recordWriter = createWriter(partitionKey); - writers.put(partitionKey, recordWriter); + writers.invalidate(copy); + recordWriter = createWriter(copy); + writers.put(copy, recordWriter); } return recordWriter; } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index a060bc16d6c7..39b8899456b5 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -29,10 +29,13 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -68,6 +71,7 @@ import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; @@ -78,7 +82,10 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.PartitionUtil; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -263,6 +270,22 @@ private List populateTable(Table table) throws IOException { return expectedRows; } + private static Map constantsMap( + FileScanTask task, + BiFunction converter, + org.apache.iceberg.Schema schema) { + PartitionSpec spec = task.spec(); + Set idColumns = spec.identitySourceIds(); + org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema, idColumns); + boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); + + if (projectsIdentityPartitionColumns) { + return PartitionUtil.constantsMap(task, converter); + } else { + return Collections.emptyMap(); + } + } + private List readRecords(Table table) { Schema tableSchema = table.schema(); TableScan tableScan = table.newScan().project(tableSchema); @@ -271,13 +294,16 @@ private List readRecords(Table table) { InputFilesDecryptor descryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); for (FileScanTask fileTask : task.files()) { + Map idToConstants = + constantsMap(fileTask, IdentityPartitionConverters::convertConstant, tableSchema); InputFile inputFile = descryptor.getInputFile(fileTask); CloseableIterable iterable = Parquet.read(inputFile) .split(fileTask.start(), fileTask.length()) .project(tableSchema) .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(tableSchema, fileSchema)) + fileSchema -> + GenericParquetReaders.buildReader(tableSchema, fileSchema, idToConstants)) .filter(fileTask.residual()) .build();