Skip to content

Commit

Permalink
Kernel: Support schema evolution through existing withSchema API on T…
Browse files Browse the repository at this point in the history
…ransactionBuilder
  • Loading branch information
amogh-jahagirdar committed Feb 27, 2025
1 parent 6e9498c commit 4278745
Show file tree
Hide file tree
Showing 5 changed files with 571 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class TransactionBuilderImpl implements TransactionBuilder {
private final Map<String, DomainMetadata> domainMetadatasAdded = new HashMap<>();
private final Set<String> domainMetadatasRemoved = new HashSet<>();
private Optional<StructType> schema = Optional.empty();
private boolean updatedSchema;
private Optional<List<String>> partitionColumns = Optional.empty();
private Optional<SetTransaction> setTxnOpt = Optional.empty();
private Optional<Map<String, String>> tableProperties = Optional.empty();
Expand All @@ -79,6 +80,7 @@ public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation oper
@Override
public TransactionBuilder withSchema(Engine engine, StructType newSchema) {
this.schema = Optional.of(newSchema); // will be verified as part of the build() call
this.updatedSchema = true;
return this;
}

Expand Down Expand Up @@ -166,15 +168,23 @@ public Transaction build(Engine engine) {
boolean shouldUpdateProtocol = false;
Metadata metadata = snapshot.getMetadata();
Protocol protocol = snapshot.getProtocol();
Map<String, String> validatedProperties =
TableConfig.validateDeltaProperties(tableProperties.orElse(Collections.emptyMap()));
Map<String, String> newProperties = metadata.filterOutUnchangedProperties(validatedProperties);

ColumnMapping.verifyColumnMappingChange(metadata.getConfiguration(), newProperties, isNewTable);
if (tableProperties.isPresent()) {
Map<String, String> validatedProperties =
TableConfig.validateDeltaProperties(tableProperties.orElse(Collections.emptyMap()));
Map<String, String> newProperties = metadata.filterOutUnchangedProperties(validatedProperties);

if (!newProperties.isEmpty()) {
ColumnMapping.verifyColumnMappingChange(metadata.getConfiguration(), newProperties, isNewTable);

if (!newProperties.isEmpty()) {
shouldUpdateMetadata = true;
metadata = metadata.withNewConfiguration(newProperties);
}
}

if (updatedSchema) {
shouldUpdateMetadata = true;
metadata = metadata.withNewConfiguration(newProperties);
metadata = metadata.withNewSchema(schema.get());
}

Optional<Tuple2<Protocol, Set<TableFeature>>> newProtocolAndFeatures =
Expand Down Expand Up @@ -204,7 +214,8 @@ public Transaction build(Engine engine) {
shouldUpdateProtocol,
maxRetries,
table.getClock(),
getDomainMetadatasToCommit(snapshot));
getDomainMetadatasToCommit(snapshot),
!isNewTable && updatedSchema);
}

/** Validate the given parameters for the transaction. */
Expand All @@ -213,14 +224,26 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
// Validate the table has no features that Kernel doesn't yet support writing into it.
TableFeatures.validateKernelCanWriteToTable(
snapshot.getProtocol(), snapshot.getMetadata(), tablePath);
ColumnMappingMode mappingMode =
ColumnMapping.getColumnMappingMode(
isNewTable
? tableProperties.orElse(Collections.emptyMap())
: snapshot.getMetadata().getConfiguration());

if (!isNewTable) {
if (schema.isPresent()) {
throw tableAlreadyExists(
tablePath,
"Table already exists, but provided a new schema. "
+ "Schema can only be set on a new table.");
boolean columnMappingEnabled = isColumnMappingModeEnabled(mappingMode);
if (!columnMappingEnabled && updatedSchema) {
throw new IllegalArgumentException(
"Cannot update schema for table when column mapping is disabled");
}

if (updatedSchema) {
// If overriding a schema on the existing table, the actual column IDs on the new schema
// should be validated
SchemaUtils.validateUpdatedSchema(snapshot.getSchema(), schema.get());
SchemaUtils.validatePartitionColumns(schema.get(), snapshot.getPartitionColumnNames());
}

if (partitionColumns.isPresent()) {
throw tableAlreadyExists(
tablePath,
Expand All @@ -229,9 +252,6 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
}
} else {
// New table verify the given schema and partition columns
ColumnMappingMode mappingMode =
ColumnMapping.getColumnMappingMode(tableProperties.orElse(Collections.emptyMap()));

SchemaUtils.validateSchema(schema.get(), isColumnMappingModeEnabled(mappingMode));
SchemaUtils.validatePartitionColumns(
schema.get(), partitionColumns.orElse(Collections.emptyList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public class TransactionImpl implements Transaction {
private final Optional<SetTransaction> setTxnOpt;
private final boolean shouldUpdateProtocol;
private final Clock clock;
private List<DomainMetadata> domainMetadatas;
private final boolean preserveFieldIds;

private List<DomainMetadata> domainMetadatas = new ArrayList<>();
private Metadata metadata;
private boolean shouldUpdateMetadata;
private int maxRetries;
Expand All @@ -94,7 +96,8 @@ public TransactionImpl(
boolean shouldUpdateProtocol,
int maxRetries,
Clock clock,
List<DomainMetadata> domainMetadatas) {
List<DomainMetadata> domainMetadatas,
boolean preserveFieldIds) {
this.isNewTable = isNewTable;
this.dataPath = dataPath;
this.logPath = logPath;
Expand All @@ -109,6 +112,7 @@ public TransactionImpl(
this.maxRetries = maxRetries;
this.clock = clock;
this.domainMetadatas = domainMetadatas;
this.preserveFieldIds = preserveFieldIds;
}

@Override
Expand Down Expand Up @@ -301,11 +305,14 @@ private TransactionCommitResult doCommit(
List<Row> metadataActions = new ArrayList<>();
metadataActions.add(createCommitInfoSingleAction(attemptCommitInfo.toRow()));
if (shouldUpdateMetadata || isNewTable) {
this.metadata =
ColumnMapping.updateColumnMappingMetadata(
metadata,
ColumnMapping.getColumnMappingMode(metadata.getConfiguration()),
isNewTable);
if (!preserveFieldIds) {
this.metadata =
ColumnMapping.updateColumnMappingMetadata(
metadata,
ColumnMapping.getColumnMappingMode(metadata.getConfiguration()),
isNewTable);
}

metadataActions.add(createMetadataSingleAction(metadata.toRow()));
}
if (shouldUpdateProtocol || isNewTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,111 @@ static int findMaxColumnId(StructType schema) {
return maxColumnId;
}

static void validateColumnIds(StructType currentSchema, StructType updatedSchema) {
Map<Long, String> currentFieldIdToPhysicalName = new HashMap<>();
for (StructField field : currentSchema.fields()) {
validateColumnIds(field, currentFieldIdToPhysicalName);
}

Map<Long, String> updatedFieldIdToPhysicalName = new HashMap<>();
for (StructField field : updatedSchema.fields()) {
validateColumnIds(field, updatedFieldIdToPhysicalName);
}

Set<String> dedupedPhysicalNames = new HashSet<>(updatedFieldIdToPhysicalName.values());
if (dedupedPhysicalNames.size() != updatedFieldIdToPhysicalName.size()) {
throw new IllegalArgumentException("Assigned physical names must be unique");
}

for (Map.Entry<Long, String> field : updatedFieldIdToPhysicalName.entrySet()) {
String existingPhysicalName = currentFieldIdToPhysicalName.get(field.getKey());
// Found an existing field, verify the physical name is preserved
if (existingPhysicalName != null && !existingPhysicalName.equals(field.getValue())) {
throw new IllegalArgumentException(
String.format(
"Existing field with id %s in current schema has physical name %s which is different from %s",
field.getKey(), existingPhysicalName, field.getValue()));
}
}
}

private static void validateColumnIds(
StructField field, Map<Long, String> fieldIdToPhysicalName) {
if (!hasPhysicalName(field)) {
throw new IllegalArgumentException(
String.format(
"Column mapping mode is name and field %s is missing physical name",
field.getName()));
}

if (!hasColumnId(field)) {
throw new IllegalArgumentException(
String.format(
"Column mapping mode is name and field %s is missing column id", field.getName()));
}

long columnId = getColumnId(field);

if (fieldIdToPhysicalName.containsKey(columnId)) {
throw new IllegalArgumentException(
String.format("Field %s with id %d already exists", field.getName(), columnId));
}

String physicalName = getPhysicalName(field);
fieldIdToPhysicalName.put(columnId, physicalName);

if (field.getDataType() instanceof MapType) {
if (!hasNestedColumnIds(field)) {
throw new IllegalArgumentException(
String.format("Map field %s must have exactly 2 nested IDs", field.getName()));
}

List<Long> nestedFieldIds = getNestedFieldIds(field);

if (nestedFieldIds.size() != 2) {
throw new IllegalArgumentException(
String.format("Map field %s must have exactly 2 nested IDs", field.getName()));
}

Set<Long> dedupedNestedFieldIds = new HashSet<>(nestedFieldIds);
if (nestedFieldIds.size() != dedupedNestedFieldIds.size()) {
throw new IllegalArgumentException(
String.format("Map field %s cannot contain duplicate nested IDs", field.getName()));
}

for (Long id : dedupedNestedFieldIds) {
if (fieldIdToPhysicalName.containsKey(id)) {
throw new IllegalArgumentException(
String.format("Nested field with id %s already exists", id));
}
}

} else if (field.getDataType() instanceof ArrayType) {
if (!hasNestedColumnIds(field)) {
throw new IllegalArgumentException(
String.format("Array field %s must have exactly 1 nested ID", field.getName()));
}

List<Long> nestedFieldIds = getNestedFieldIds(field);
if (nestedFieldIds.size() != 1) {
throw new IllegalArgumentException(
String.format("Array field %s must have exactly 1 nested ID", field.getName()));
}
} else if (field.getDataType() instanceof StructType) {
StructType structType = (StructType) field.getDataType();
for (StructField nestedField : structType.fields()) {
validateColumnIds(nestedField, fieldIdToPhysicalName);
}
}
}

private static List<Long> getNestedFieldIds(StructField field) {
return getNestedColumnIds(field).getEntries().values().stream()
.filter(Long.class::isInstance)
.map(Long.class::cast)
.collect(Collectors.toList());
}

private static int findMaxColumnId(StructField field, int maxColumnId) {
if (hasColumnId(field)) {
maxColumnId = Math.max(maxColumnId, getColumnId(field));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.delta.kernel.internal.util;

import static io.delta.kernel.internal.DeltaErrors.*;
import static io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_ID_KEY;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.expressions.Literal;
Expand Down Expand Up @@ -78,6 +79,59 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab
validateSupportedType(schema);
}

public static void validateUpdatedSchema(StructType currentSchema, StructType schema) {
validateSchema(schema, true);
ColumnMapping.validateColumnIds(currentSchema, schema);
validateUpdatedSchemaCompatibility(currentSchema, schema);
}

private static void validateUpdatedSchemaCompatibility(
StructType currentSchema, StructType newSchema) {
// Identify added columns based on field IDs
Map<Integer, StructField> newSchemaIdToField = idToField(newSchema);
Map<Integer, StructField> currentSchemaIdToField = idToField(currentSchema);
for (Map.Entry<Integer, StructField> newFieldEntry : newSchemaIdToField.entrySet()) {
if (!currentSchemaIdToField.containsKey(newFieldEntry.getKey())) {
if (!newFieldEntry.getValue().isNullable()) {
throw new IllegalArgumentException(
String.format(
"Cannot add a non-nullable field %s", newFieldEntry.getValue().getName()));
}
} else {
StructField currentField = currentSchemaIdToField.get(newFieldEntry.getKey());
StructField newField = newSchemaIdToField.get(newFieldEntry.getKey());
if (newField.getDataType() != currentField.getDataType()) {
throw new IllegalArgumentException(
String.format(
"Cannot change existing field %s from %s to %s",
currentField.getName(), currentField.getDataType(), newField.getDataType()));
}
}
}
}

private static Map<Integer, StructField> idToField(StructType schema) {
Map<Integer, StructField> idToField = new HashMap<>();
for (StructField field : schema.fields()) {
idToField.putAll(idToField(field));
}

return idToField;
}

private static Map<Integer, StructField> idToField(StructField field) {
Map<Integer, StructField> idToField = new HashMap<>();
idToField.put(field.getMetadata().getLong(COLUMN_MAPPING_ID_KEY).intValue(), field);
if (field.getDataType() instanceof StructType) {
StructType structType = (StructType) field.getDataType();
for (StructField nestedField : structType.fields()) {
idToField.putAll(idToField(nestedField));
}
}

return idToField;
}

/**
* Verify the partition columns exists in the table schema and a supported data type for a
* partition column.
Expand Down
Loading

0 comments on commit 4278745

Please sign in to comment.