From 42787459bcec78e1e8674f70a210dbc4b042baff Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Thu, 27 Feb 2025 09:16:19 +0530 Subject: [PATCH] Kernel: Support schema evolution through existing withSchema API on TransactionBuilder --- .../internal/TransactionBuilderImpl.java | 50 ++- .../kernel/internal/TransactionImpl.java | 21 +- .../kernel/internal/util/ColumnMapping.java | 105 +++++ .../kernel/internal/util/SchemaUtils.java | 54 +++ .../defaults/DeltaTableWritesSuite.scala | 363 ++++++++++++++++++ 5 files changed, 571 insertions(+), 22 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index 24613905e33..ba5da90a783 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -59,6 +59,7 @@ public class TransactionBuilderImpl implements TransactionBuilder { private final Map domainMetadatasAdded = new HashMap<>(); private final Set domainMetadatasRemoved = new HashSet<>(); private Optional schema = Optional.empty(); + private boolean updatedSchema; private Optional> partitionColumns = Optional.empty(); private Optional setTxnOpt = Optional.empty(); private Optional> tableProperties = Optional.empty(); @@ -79,6 +80,7 @@ public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation oper @Override public TransactionBuilder withSchema(Engine engine, StructType newSchema) { this.schema = Optional.of(newSchema); // will be verified as part of the build() call + this.updatedSchema = true; return this; } @@ -166,15 +168,23 @@ public Transaction build(Engine engine) { boolean shouldUpdateProtocol = false; Metadata metadata = snapshot.getMetadata(); Protocol protocol = snapshot.getProtocol(); - Map validatedProperties = - TableConfig.validateDeltaProperties(tableProperties.orElse(Collections.emptyMap())); - Map newProperties = metadata.filterOutUnchangedProperties(validatedProperties); - ColumnMapping.verifyColumnMappingChange(metadata.getConfiguration(), newProperties, isNewTable); + if (tableProperties.isPresent()) { + Map validatedProperties = + TableConfig.validateDeltaProperties(tableProperties.orElse(Collections.emptyMap())); + Map newProperties = metadata.filterOutUnchangedProperties(validatedProperties); - if (!newProperties.isEmpty()) { + ColumnMapping.verifyColumnMappingChange(metadata.getConfiguration(), newProperties, isNewTable); + + if (!newProperties.isEmpty()) { + shouldUpdateMetadata = true; + metadata = metadata.withNewConfiguration(newProperties); + } + } + + if (updatedSchema) { shouldUpdateMetadata = true; - metadata = metadata.withNewConfiguration(newProperties); + metadata = metadata.withNewSchema(schema.get()); } Optional>> newProtocolAndFeatures = @@ -204,7 +214,8 @@ public Transaction build(Engine engine) { shouldUpdateProtocol, maxRetries, table.getClock(), - getDomainMetadatasToCommit(snapshot)); + getDomainMetadatasToCommit(snapshot), + !isNewTable && updatedSchema); } /** Validate the given parameters for the transaction. */ @@ -213,14 +224,26 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable) // Validate the table has no features that Kernel doesn't yet support writing into it. TableFeatures.validateKernelCanWriteToTable( snapshot.getProtocol(), snapshot.getMetadata(), tablePath); + ColumnMappingMode mappingMode = + ColumnMapping.getColumnMappingMode( + isNewTable + ? tableProperties.orElse(Collections.emptyMap()) + : snapshot.getMetadata().getConfiguration()); if (!isNewTable) { - if (schema.isPresent()) { - throw tableAlreadyExists( - tablePath, - "Table already exists, but provided a new schema. " - + "Schema can only be set on a new table."); + boolean columnMappingEnabled = isColumnMappingModeEnabled(mappingMode); + if (!columnMappingEnabled && updatedSchema) { + throw new IllegalArgumentException( + "Cannot update schema for table when column mapping is disabled"); + } + + if (updatedSchema) { + // If overriding a schema on the existing table, the actual column IDs on the new schema + // should be validated + SchemaUtils.validateUpdatedSchema(snapshot.getSchema(), schema.get()); + SchemaUtils.validatePartitionColumns(schema.get(), snapshot.getPartitionColumnNames()); } + if (partitionColumns.isPresent()) { throw tableAlreadyExists( tablePath, @@ -229,9 +252,6 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable) } } else { // New table verify the given schema and partition columns - ColumnMappingMode mappingMode = - ColumnMapping.getColumnMappingMode(tableProperties.orElse(Collections.emptyMap())); - SchemaUtils.validateSchema(schema.get(), isColumnMappingModeEnabled(mappingMode)); SchemaUtils.validatePartitionColumns( schema.get(), partitionColumns.orElse(Collections.emptyList())); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 7e0919783df..76ff72825d0 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -73,7 +73,9 @@ public class TransactionImpl implements Transaction { private final Optional setTxnOpt; private final boolean shouldUpdateProtocol; private final Clock clock; - private List domainMetadatas; + private final boolean preserveFieldIds; + + private List domainMetadatas = new ArrayList<>(); private Metadata metadata; private boolean shouldUpdateMetadata; private int maxRetries; @@ -94,7 +96,8 @@ public TransactionImpl( boolean shouldUpdateProtocol, int maxRetries, Clock clock, - List domainMetadatas) { + List domainMetadatas, + boolean preserveFieldIds) { this.isNewTable = isNewTable; this.dataPath = dataPath; this.logPath = logPath; @@ -109,6 +112,7 @@ public TransactionImpl( this.maxRetries = maxRetries; this.clock = clock; this.domainMetadatas = domainMetadatas; + this.preserveFieldIds = preserveFieldIds; } @Override @@ -301,11 +305,14 @@ private TransactionCommitResult doCommit( List metadataActions = new ArrayList<>(); metadataActions.add(createCommitInfoSingleAction(attemptCommitInfo.toRow())); if (shouldUpdateMetadata || isNewTable) { - this.metadata = - ColumnMapping.updateColumnMappingMetadata( - metadata, - ColumnMapping.getColumnMappingMode(metadata.getConfiguration()), - isNewTable); + if (!preserveFieldIds) { + this.metadata = + ColumnMapping.updateColumnMappingMetadata( + metadata, + ColumnMapping.getColumnMappingMode(metadata.getConfiguration()), + isNewTable); + } + metadataActions.add(createMetadataSingleAction(metadata.toRow())); } if (shouldUpdateProtocol || isNewTable) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java index 205232d489e..91ad7f1abca 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ColumnMapping.java @@ -156,6 +156,111 @@ static int findMaxColumnId(StructType schema) { return maxColumnId; } + static void validateColumnIds(StructType currentSchema, StructType updatedSchema) { + Map currentFieldIdToPhysicalName = new HashMap<>(); + for (StructField field : currentSchema.fields()) { + validateColumnIds(field, currentFieldIdToPhysicalName); + } + + Map updatedFieldIdToPhysicalName = new HashMap<>(); + for (StructField field : updatedSchema.fields()) { + validateColumnIds(field, updatedFieldIdToPhysicalName); + } + + Set dedupedPhysicalNames = new HashSet<>(updatedFieldIdToPhysicalName.values()); + if (dedupedPhysicalNames.size() != updatedFieldIdToPhysicalName.size()) { + throw new IllegalArgumentException("Assigned physical names must be unique"); + } + + for (Map.Entry field : updatedFieldIdToPhysicalName.entrySet()) { + String existingPhysicalName = currentFieldIdToPhysicalName.get(field.getKey()); + // Found an existing field, verify the physical name is preserved + if (existingPhysicalName != null && !existingPhysicalName.equals(field.getValue())) { + throw new IllegalArgumentException( + String.format( + "Existing field with id %s in current schema has physical name %s which is different from %s", + field.getKey(), existingPhysicalName, field.getValue())); + } + } + } + + private static void validateColumnIds( + StructField field, Map fieldIdToPhysicalName) { + if (!hasPhysicalName(field)) { + throw new IllegalArgumentException( + String.format( + "Column mapping mode is name and field %s is missing physical name", + field.getName())); + } + + if (!hasColumnId(field)) { + throw new IllegalArgumentException( + String.format( + "Column mapping mode is name and field %s is missing column id", field.getName())); + } + + long columnId = getColumnId(field); + + if (fieldIdToPhysicalName.containsKey(columnId)) { + throw new IllegalArgumentException( + String.format("Field %s with id %d already exists", field.getName(), columnId)); + } + + String physicalName = getPhysicalName(field); + fieldIdToPhysicalName.put(columnId, physicalName); + + if (field.getDataType() instanceof MapType) { + if (!hasNestedColumnIds(field)) { + throw new IllegalArgumentException( + String.format("Map field %s must have exactly 2 nested IDs", field.getName())); + } + + List nestedFieldIds = getNestedFieldIds(field); + + if (nestedFieldIds.size() != 2) { + throw new IllegalArgumentException( + String.format("Map field %s must have exactly 2 nested IDs", field.getName())); + } + + Set dedupedNestedFieldIds = new HashSet<>(nestedFieldIds); + if (nestedFieldIds.size() != dedupedNestedFieldIds.size()) { + throw new IllegalArgumentException( + String.format("Map field %s cannot contain duplicate nested IDs", field.getName())); + } + + for (Long id : dedupedNestedFieldIds) { + if (fieldIdToPhysicalName.containsKey(id)) { + throw new IllegalArgumentException( + String.format("Nested field with id %s already exists", id)); + } + } + + } else if (field.getDataType() instanceof ArrayType) { + if (!hasNestedColumnIds(field)) { + throw new IllegalArgumentException( + String.format("Array field %s must have exactly 1 nested ID", field.getName())); + } + + List nestedFieldIds = getNestedFieldIds(field); + if (nestedFieldIds.size() != 1) { + throw new IllegalArgumentException( + String.format("Array field %s must have exactly 1 nested ID", field.getName())); + } + } else if (field.getDataType() instanceof StructType) { + StructType structType = (StructType) field.getDataType(); + for (StructField nestedField : structType.fields()) { + validateColumnIds(nestedField, fieldIdToPhysicalName); + } + } + } + + private static List getNestedFieldIds(StructField field) { + return getNestedColumnIds(field).getEntries().values().stream() + .filter(Long.class::isInstance) + .map(Long.class::cast) + .collect(Collectors.toList()); + } + private static int findMaxColumnId(StructField field, int maxColumnId) { if (hasColumnId(field)) { maxColumnId = Math.max(maxColumnId, getColumnId(field)); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java index a5c07dc2450..cec9776627d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java @@ -16,6 +16,7 @@ package io.delta.kernel.internal.util; import static io.delta.kernel.internal.DeltaErrors.*; +import static io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_ID_KEY; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import io.delta.kernel.expressions.Literal; @@ -78,6 +79,59 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab validateSupportedType(schema); } + public static void validateUpdatedSchema(StructType currentSchema, StructType schema) { + validateSchema(schema, true); + ColumnMapping.validateColumnIds(currentSchema, schema); + validateUpdatedSchemaCompatibility(currentSchema, schema); + } + + private static void validateUpdatedSchemaCompatibility( + StructType currentSchema, StructType newSchema) { + // Identify added columns based on field IDs + Map newSchemaIdToField = idToField(newSchema); + Map currentSchemaIdToField = idToField(currentSchema); + for (Map.Entry newFieldEntry : newSchemaIdToField.entrySet()) { + if (!currentSchemaIdToField.containsKey(newFieldEntry.getKey())) { + if (!newFieldEntry.getValue().isNullable()) { + throw new IllegalArgumentException( + String.format( + "Cannot add a non-nullable field %s", newFieldEntry.getValue().getName())); + } + } else { + StructField currentField = currentSchemaIdToField.get(newFieldEntry.getKey()); + StructField newField = newSchemaIdToField.get(newFieldEntry.getKey()); + if (newField.getDataType() != currentField.getDataType()) { + throw new IllegalArgumentException( + String.format( + "Cannot change existing field %s from %s to %s", + currentField.getName(), currentField.getDataType(), newField.getDataType())); + } + } + } + } + + private static Map idToField(StructType schema) { + Map idToField = new HashMap<>(); + for (StructField field : schema.fields()) { + idToField.putAll(idToField(field)); + } + + return idToField; + } + + private static Map idToField(StructField field) { + Map idToField = new HashMap<>(); + idToField.put(field.getMetadata().getLong(COLUMN_MAPPING_ID_KEY).intValue(), field); + if (field.getDataType() instanceof StructType) { + StructType structType = (StructType) field.getDataType(); + for (StructField nestedField : structType.fields()) { + idToField.putAll(idToField(nestedField)); + } + } + + return idToField; + } + /** * Verify the partition columns exists in the table schema and a supported data type for a * partition column. diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 3d04d95e1a6..6d97e0ea1f5 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -1321,6 +1321,369 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } + test("Test set schema on existing table") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", StringType.STRING, true) + .add("c", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema() + val newSchema = new StructType() + .add("a", StringType.STRING, true, currentSchema.get("a").getMetadata) + .add("b", + new StructType() + .add("d", IntegerType.INTEGER, true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 4) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "d").build()) + .add("e", IntegerType.INTEGER, true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 5) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "e").build()), true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 3) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "b").build()) + .add("c", IntegerType.INTEGER, true, currentSchema.get("c").getMetadata) + + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + + val structType = table.getLatestSnapshot(engine).getSchema + assertColumnMapping(structType.get("a"), 1) + + val innerStruct = structType.get("b").getDataType.asInstanceOf[StructType] + assertColumnMapping(innerStruct.get("d"), 4, "d") + assertColumnMapping(innerStruct.get("e"), 5, "e") + assertColumnMapping(structType.get("c"), 2) + } + } + + test("Test cannot update schema on table when column mapping disabled") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", StringType.STRING, true) + .add("c", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map( + TableConfig.COLUMN_MAPPING_MODE.getKey -> "none")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", StringType.STRING, true, currentSchema.get("a").getMetadata) + .add("b", + new StructType() + .add("d", IntegerType.INTEGER, true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 4) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "d").build()), + true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 3) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "b").build()) + .add("c", IntegerType.INTEGER, true, currentSchema.get("c").getMetadata) + + val e = intercept[IllegalArgumentException] { + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + } + + assert(e.getMessage.contains( + "Cannot update schema for table when column mapping is disabled")) + } + } + + test("Test cannot update schema with duplicate field IDs") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", StringType.STRING, true) + .add("c", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", StringType.STRING, true, currentSchema.get("a").getMetadata) + .add("b", + new StructType() + .add("duplicate_field_id", IntegerType.INTEGER, true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "d").build()) + .add("e", IntegerType.INTEGER, true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 5) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "e").build()), true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 3) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "b").build()) + .add("c", IntegerType.INTEGER, true, currentSchema.get("c").getMetadata) + + val e = intercept[IllegalArgumentException] { + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + } + assert(e.getMessage.contains( + "Field duplicate_field_id with id 1 already exists")) + } + } + + test("Test cannot add non-nullable field") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", StringType.STRING, true) + .add("c", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", StringType.STRING, true, currentSchema.get("a").getMetadata) + .add("b", + new StructType() + .add("non_nullable_field", IntegerType.INTEGER, false, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 4) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "d").build()) + .add("e", IntegerType.INTEGER, true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 5) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "e").build()), true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 3) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "b").build()) + .add("c", IntegerType.INTEGER, true, currentSchema.get("c").getMetadata) + + val e = intercept[IllegalArgumentException] { + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + } + assert(e.getMessage.contains( + "Cannot add a non-nullable field non_nullable_field")) + } + } + + test("Test cannot drop a partition column") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", StringType.STRING, true) + .add("c", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq("c"), + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", StringType.STRING, true, currentSchema.get("a").getMetadata) + .add("b", + new StructType() + .add("d", IntegerType.INTEGER, true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 4) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "d").build()) + .add("e", IntegerType.INTEGER, true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 5) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "e").build()), true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 3) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "b").build()) + + val e = intercept[IllegalArgumentException] { + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + } + assert(e.getMessage.contains( + "Partition column c not found in the schema")) + } + } + + test("Test cannot promote types via withSchema") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", StringType.STRING, true) + .add("c", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", StringType.STRING, true, currentSchema.get("a").getMetadata) + .add("c", LongType.LONG, true, currentSchema.get("c").getMetadata) + + val e = intercept[IllegalArgumentException] { + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + } + assert(e.getMessage.contains( + "Cannot change existing field c from integer to long")) + } + } + + test("Test fail updating schema if physical columns are not preserved") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", StringType.STRING, true) + .add("c", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", StringType.STRING, true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 1) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, + "not_preserving_physical_column").build()) + .add("c", LongType.LONG, true, currentSchema.get("c").getMetadata) + + val e = intercept[IllegalArgumentException] { + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + } + assert(e.getMessage.contains( + "Existing field with id 1 in current schema has physical name")) + } + } + + test("Test fail updating schema if map columns are missing nested ids for key") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", StringType.STRING, true, currentSchema.get("a").getMetadata) + .add("c", new MapType( + StringType.STRING, StringType.STRING, false), true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 2) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "c").build()) + + val e = intercept[IllegalArgumentException] { + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + } + + assert(e.getMessage.contains( + "Map field c must have exactly 2 nested IDs")) + } + } + + test("Test fail updating schema if map columns are missing nested ids") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", StringType.STRING, true, currentSchema.get("a").getMetadata) + .add("c", new MapType( + StringType.STRING, StringType.STRING, false), true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 2) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "c").build()) + + val e = intercept[IllegalArgumentException] { + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + } + + assert(e.getMessage.contains( + "Map field c must have exactly 2 nested IDs")) + } + } + + test("Test fail updating schema if array columns is missing nested id") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", StringType.STRING, true, currentSchema.get("a").getMetadata) + .add("c", new ArrayType(StringType.STRING, false), true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 2) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "c").build()) + + val e = intercept[IllegalArgumentException] { + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + } + + assert(e.getMessage.contains( + "Array field c must have exactly 1 nested ID")) + } + } + + test("Test updating schema with adding an array and map type") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val schema = new StructType() + .add("a", IntegerType.INTEGER, true) + + createTxn(engine, tablePath, isNewTable = true, schema, partCols = Seq.empty, + tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "id", + TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true")).commit(engine, emptyIterable()) + + val currentSchema = table.getLatestSnapshot(engine).getSchema + val newSchema = new StructType() + .add("a", IntegerType.INTEGER, true, currentSchema.get("a").getMetadata) + .add("arr", new ArrayType(StringType.STRING, false), true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 2) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "arr") + .putFieldMetadata(ColumnMapping.COLUMN_MAPPING_NESTED_IDS_KEY, + FieldMetadata.builder().putLong("arr.element", 3).build()).build()) + .add("map", new MapType(StringType.STRING, StringType.STRING, false), true, + FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 4) + .putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "map") + .putFieldMetadata(ColumnMapping.COLUMN_MAPPING_NESTED_IDS_KEY, + FieldMetadata.builder().putLong("map.key", 5) + .putLong("map.value", 6).build()).build()) + + table.createTransactionBuilder(engine, testEngineInfo, Operation.MANUAL_UPDATE) + .withSchema(engine, newSchema) + .build(engine).commit(engine, emptyIterable()) + + val structType = table.getLatestSnapshot(engine).getSchema + assertColumnMapping(structType.get("a"), 1) + assertColumnMapping(structType.get("arr"), 2, "arr") + assertColumnMapping(structType.get("map"), 4, "map") + assert(structType.get("arr").getMetadata.get(ColumnMapping.COLUMN_MAPPING_NESTED_IDS_KEY) + == FieldMetadata.builder().putLong("arr.element", 3).build()) + assert(structType.get("map").getMetadata.get(ColumnMapping.COLUMN_MAPPING_NESTED_IDS_KEY) + == FieldMetadata.builder().putLong("map.key", 5).putLong("map.value", 6).build()) + } + } + private def assertColumnMapping( field: StructField, expId: Long,