Skip to content

Commit

Permalink
[Spark] Transition Type Widening table feature from preview to stable (
Browse files Browse the repository at this point in the history
…delta-io#4127)

## Description
## What changes were proposed in this pull request?
The type widening spec has been stable since June 2024, both the preview
and stable table features are supported since Delta 3.3.
Until now, the preview feature was enabled by default when enabling type
widening. This change switches to enabling the stable feature instead by
default.
Clients that only understand the preview feature (Delta 3.2, Delta 4.0
Preview) won't be able to read *newly created* tables. That means
clients will need to upgrade from Delta 3.2 to 3.3 for that specific use
case.

The preview and stable features are compatible, the only difference is
that the stable spec doesn't require storing the table version as part
of the type widening metadata. The stable feature doesn't write and
ignores on read this `tableVersion` field.

## How was this patch tested?
Updated existing tests.

## Does this PR introduce _any_ user-facing changes?
The stable table feature `typeWidening` is now added to the table when
enabling type widening instead of the preview table feature
`typeWidening-preview`.

This will prevent Delta 3.2 from reading *newly created* tables with
upcoming Delta versions (3.4/4.0?), requiring readers to upgrade to
Delta 3.3 and above.
  • Loading branch information
johanl-db authored Feb 18, 2025
1 parent f2b17b9 commit cc68565
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ class DeltaSharingDataSourceTypeWideningSuite
.putMetadataArray(
"delta.typeChanges", Array(
new MetadataBuilder()
.putLong("tableVersion", 2)
.putString("fromType", "short")
.putString("toType", "integer")
.build()))
Expand Down
30 changes: 14 additions & 16 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -885,34 +885,32 @@ abstract class TypeWideningTableFeatureBase(name: String) extends ReaderWriterFe

/**
* Feature used for the preview phase of type widening. Tables that enabled this feature during the
* preview will keep being supported after the preview.
* preview are still supported after the preview.
*
* Note: Users can manually add both the preview and stable features to a table using ADD FEATURE,
* although that's undocumented for type widening. This is allowed: the two feature specifications
* are compatible and supported.
*/
object TypeWideningPreviewTableFeature
extends TypeWideningTableFeatureBase(name = "typeWidening-preview")

/**
* Stable feature for type widening.
*/
object TypeWideningTableFeature
extends TypeWideningTableFeatureBase(name = "typeWidening")
with FeatureAutomaticallyEnabledByMetadata {
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
protocol: Protocol,
metadata: Metadata,
spark: SparkSession): Boolean = isTypeWideningSupportNeededByMetadata(metadata) &&
// Don't automatically enable the preview feature if the stable feature is already supported.
!protocol.isFeatureSupported(TypeWideningTableFeature)
// Don't automatically enable the stable feature if the preview feature is already supported, to
// avoid possibly breaking old clients that only support the preview feature.
!protocol.isFeatureSupported(TypeWideningPreviewTableFeature)
}

/**
* Stable feature for type widening. The stable feature isn't enabled automatically yet
* when setting the type widening table property as the feature is still in preview in this version.
* The feature spec is finalized though and by supporting the stable feature here we guarantee that
* this version can already read any table created in the future.
*
* Note: Users can manually add both the preview and stable features to a table using ADD FEATURE,
* although that's undocumented for type widening. This is allowed: the two feature specifications
* are compatible and supported.
*/
object TypeWideningTableFeature
extends TypeWideningTableFeatureBase(name = "typeWidening")

/**
* inCommitTimestamp table feature is a writer feature that makes
* every writer write a monotonically increasing timestamp inside the commit file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,18 @@ trait TypeWideningAlterTableNestedTests {
new MetadataBuilder()
.putString("toType", "short")
.putString("fromType", "byte")
.putLong("tableVersion", 2)
.build()
)).build()))
.add("m", MapType(IntegerType, IntegerType), nullable = true, metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "byte")
.putLong("tableVersion", 3)
.putString("fieldPath", "key")
.build(),
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.putLong("tableVersion", 4)
.putString("fieldPath", "value")
.build()
)).build())
Expand All @@ -135,7 +132,6 @@ trait TypeWideningAlterTableNestedTests {
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.putLong("tableVersion", 5)
.putString("fieldPath", "element")
.build()
)).build()))
Expand All @@ -162,21 +158,18 @@ trait TypeWideningAlterTableNestedTests {
new MetadataBuilder()
.putString("toType", "short")
.putString("fromType", "byte")
.putLong("tableVersion", 2)
.build()
)).build()))
.add("m", MapType(IntegerType, IntegerType), nullable = true, metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "byte")
.putLong("tableVersion", 2)
.putString("fieldPath", "key")
.build(),
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.putLong("tableVersion", 2)
.putString("fieldPath", "value")
.build()
)).build())
Expand All @@ -185,7 +178,6 @@ trait TypeWideningAlterTableNestedTests {
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.putLong("tableVersion", 2)
.putString("fieldPath", "element")
.build()
)).build()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ trait TypeWideningAlterTableTests
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.putLong("tableVersion", 1)
.build()
)).build()))
checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ trait TypeWideningCompatibilityTests {
assert(readDeltaTable(tempPath).schema ===
new StructType()
.add("a", ShortType, nullable = true,
metadata = typeWideningMetadata(version = 2, ByteType, ShortType))
metadata = typeWideningMetadata(ByteType, ShortType))
.add("c", StringType, nullable = true,
metadata = new MetadataBuilder()
.putString("__CHAR_VARCHAR_TYPE_STRING", "char(3)")
Expand All @@ -149,7 +149,7 @@ trait TypeWideningCompatibilityTests {
assert(readDeltaTable(tempPath).schema ===
new StructType()
.add("a", ShortType, nullable = true,
metadata = typeWideningMetadata(version = 2, ByteType, ShortType))
metadata = typeWideningMetadata(ByteType, ShortType))
.add("c", StringType)
.add("v", StringType))
checkAnswer(readDeltaTable(tempPath),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ trait TypeWideningInsertSchemaEvolutionTests
expectedResult = ExpectedResult.Success(new StructType()
.add("a", IntegerType)
.add("b", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType)))
)

testInserts("top-level type evolution with column upcast")(
Expand All @@ -197,7 +197,7 @@ trait TypeWideningInsertSchemaEvolutionTests
expectedResult = ExpectedResult.Success(new StructType()
.add("a", IntegerType)
.add("b", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType))
.add("c", IntegerType))
)

Expand All @@ -209,7 +209,7 @@ trait TypeWideningInsertSchemaEvolutionTests
expectedResult = ExpectedResult.Success(new StructType()
.add("a", IntegerType)
.add("b", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType))
.add("c", IntegerType)),
// SQL INSERT by name doesn't support schema evolution.
excludeInserts = insertsSQL.intersect(insertsByName)
Expand All @@ -230,16 +230,14 @@ trait TypeWideningInsertSchemaEvolutionTests
.add("s", new StructType()
.add("x", ShortType)
.add("y", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType)))
.add("m", MapType(StringType, IntegerType), nullable = true,
metadata = typeWideningMetadata(
version = 1,
from = ShortType,
to = IntegerType,
path = Seq("value")))
.add("a", ArrayType(IntegerType), nullable = true,
metadata = typeWideningMetadata(
version = 1,
from = ShortType,
to = IntegerType,
path = Seq("element"))))
Expand All @@ -260,17 +258,15 @@ trait TypeWideningInsertSchemaEvolutionTests
.add("s", new StructType()
.add("x", ShortType)
.add("y", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType))
.add("z", IntegerType))
.add("m", MapType(StringType, IntegerType), nullable = true,
metadata = typeWideningMetadata(
version = 1,
from = ShortType,
to = IntegerType,
path = Seq("value")))
.add("a", ArrayType(IntegerType), nullable = true,
metadata = typeWideningMetadata(
version = 1,
from = ShortType,
to = IntegerType,
path = Seq("element"))))
Expand All @@ -291,7 +287,7 @@ trait TypeWideningInsertSchemaEvolutionTests
.add("s", new StructType()
.add("x", IntegerType)
.add("y", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType))))
)

// Interestingly, we introduced a special case to handle schema evolution / casting for structs
Expand All @@ -311,7 +307,7 @@ trait TypeWideningInsertSchemaEvolutionTests
.add("a", ArrayType(new StructType()
.add("x", IntegerType)
.add("y", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType)))))
)

// maps now allow type evolution for INSERT by position and name in SQL and dataframe.
Expand All @@ -330,6 +326,6 @@ trait TypeWideningInsertSchemaEvolutionTests
.add("m", MapType(StringType, new StructType()
.add("x", IntegerType)
.add("y", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType)))))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests
result = Seq("""{ "a": 1 }""", """{ "a": 10 }"""),
resultSchema = new StructType()
.add("a", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType))
)

testTypeEvolution("change top-level column short -> int with insert")(
Expand All @@ -205,7 +205,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests
result = Seq("""{ "a": 0 }""", """{ "a": 10 }""", """{ "a": 20 }"""),
resultSchema = new StructType()
.add("a", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType))
)

testTypeEvolution("updating using narrower value doesn't evolve schema")(
Expand Down Expand Up @@ -234,7 +234,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests
"""{ "a": 1, "b": 5 }""", """{ "a": 10, "b": 15 }"""),
resultSchema = new StructType()
.add("a", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType))
.add("b", ShortType)
)

Expand All @@ -253,7 +253,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests
resultSchema = new StructType()
.add("s", new StructType()
.add("a", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType)))
)

testTypeEvolution("automatic widening of struct field with field assignment")(
Expand All @@ -271,7 +271,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests
resultSchema = new StructType()
.add("s", new StructType()
.add("a", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType)))
)

testTypeEvolution("automatic widening of map value")(
Expand All @@ -290,7 +290,6 @@ trait TypeWideningMergeIntoSchemaEvolutionTests
MapType(StringType, IntegerType),
nullable = true,
metadata = typeWideningMetadata(
version = 1,
from = ShortType,
to = IntegerType,
path = Seq("value")))
Expand All @@ -311,7 +310,6 @@ trait TypeWideningMergeIntoSchemaEvolutionTests
ArrayType(IntegerType),
nullable = true,
metadata = typeWideningMetadata(
version = 1,
from = ShortType,
to = IntegerType,
path = Seq("element")))
Expand All @@ -331,9 +329,9 @@ trait TypeWideningMergeIntoSchemaEvolutionTests
result = Seq("""{ "a": 1, "b": 4 }""", """{ "a": 5, "b": 6 }"""),
resultSchema = new StructType()
.add("a", ShortType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ByteType, to = ShortType))
metadata = typeWideningMetadata(from = ByteType, to = ShortType))
.add("b", IntegerType, nullable = true,
metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))
metadata = typeWideningMetadata(from = ShortType, to = IntegerType))
)

for (enabled <- BOOLEAN_DOMAIN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,7 @@ trait TypeWideningMetadataEndToEndTests {
"metadata": {
"delta.typeChanges": [{
"toType": "integer",
"fromType": "short",
"tableVersion": 1
"fromType": "short"
}]
}
}]}""".stripMargin)
Expand All @@ -648,12 +647,10 @@ trait TypeWideningMetadataEndToEndTests {
"metadata": {
"delta.typeChanges": [{
"toType": "short",
"fromType": "byte",
"tableVersion": 1
"fromType": "byte"
},{
"toType": "integer",
"fromType": "short",
"tableVersion": 2
"fromType": "short"
}]
}
}]}""".stripMargin)
Expand All @@ -678,8 +675,7 @@ trait TypeWideningMetadataEndToEndTests {
"metadata": {
"delta.typeChanges": [{
"toType": "short",
"fromType": "byte",
"tableVersion": 2
"fromType": "byte"
}]
}
}]
Expand All @@ -691,7 +687,6 @@ trait TypeWideningMetadataEndToEndTests {
"delta.typeChanges": [{
"toType": "integer",
"fromType": "byte",
"tableVersion": 1,
"fieldPath": "key"
}]
}
Expand All @@ -717,7 +712,6 @@ trait TypeWideningMetadataEndToEndTests {
"delta.typeChanges": [{
"toType": "short",
"fromType": "byte",
"tableVersion": 1,
"fieldPath": "element"
}]
}
Expand All @@ -735,8 +729,7 @@ trait TypeWideningMetadataEndToEndTests {
"metadata": {
"delta.typeChanges": [{
"toType": "integer",
"fromType": "short",
"tableVersion": 2
"fromType": "short"
}]
}
}]
Expand Down
Loading

0 comments on commit cc68565

Please sign in to comment.