diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala index 512c835a387..f6b0a1efba4 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -19,7 +19,6 @@ package io.delta.sharing.spark import scala.collection.JavaConverters._ import org.apache.spark.sql.delta.{ - DeltaColumnMapping, DeltaErrors, DeltaTableUtils => TahoeDeltaTableUtils } @@ -409,13 +408,15 @@ private[sharing] class DeltaSharingDataSource HadoopFsRelation( location = fileIndex, // This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex. - // Dropping column mapping metadata because it is not relevant for partition schema. - partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(fileIndex.partitionSchema), + // Dropping delta metadata because it is not relevant for partition schema. + partitionSchema = + TahoeDeltaTableUtils.removeInternalDeltaMetadata(spark, fileIndex.partitionSchema), // This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex, original comment: // We pass all table columns as `dataSchema` so that Spark will preserve the partition // column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would // just append them to the end of `dataSchema`. - dataSchema = DeltaColumnMapping.dropColumnMappingMetadata( + dataSchema = TahoeDeltaTableUtils.removeInternalDeltaMetadata( + spark, TahoeDeltaTableUtils.removeInternalWriterMetadata( spark, SchemaUtils.dropNullTypeColumns(deltaSharingTableMetadata.metadata.schema) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala index 6f92ecaf00c..1afcbd1c494 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceTypeWideningSuite.scala @@ -98,24 +98,12 @@ class DeltaSharingDataSourceTypeWideningSuite } } - /** Short-hand for the type widening metadata for column `value` for the test table above. */ - private val typeWideningMetadata: Metadata = - new MetadataBuilder() - .putMetadataArray( - "delta.typeChanges", Array( - new MetadataBuilder() - .putString("fromType", "short") - .putString("toType", "integer") - .build())) - .build() - test(s"Delta sharing with type widening") { withTestTable { tableName => testReadingDeltaShare( tableName, versionAsOf = None, - expectedSchema = new StructType() - .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), + expectedSchema = new StructType().add("value", IntegerType), expectedResult = Seq(1, 2, 3, Int.MaxValue, 4, 5).toDF("value")) } } @@ -125,15 +113,13 @@ class DeltaSharingDataSourceTypeWideningSuite testReadingDeltaShare( tableName, versionAsOf = Some(3), - expectedSchema = new StructType() - .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), + expectedSchema = new StructType().add("value", IntegerType), expectedResult = Seq(1, 2, 3, Int.MaxValue).toDF("value")) testReadingDeltaShare( tableName, versionAsOf = Some(2), - expectedSchema = new StructType() - .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), + expectedSchema = new StructType().add("value", IntegerType), expectedResult = Seq(1, 2).toDF("value")) testReadingDeltaShare( @@ -151,8 +137,7 @@ class DeltaSharingDataSourceTypeWideningSuite tableName, versionAsOf = None, filter = Some(col("value") === Int.MaxValue), - expectedSchema = new StructType() - .add("value", IntegerType, nullable = true, metadata = typeWideningMetadata), + expectedSchema = new StructType().add("value", IntegerType), expectedResult = Seq(Int.MaxValue).toDF("value"), expectedJsonPredicate = Seq( """ @@ -187,7 +172,7 @@ class DeltaSharingDataSourceTypeWideningSuite versionAsOf = None, filter = Some(col("part") === Int.MaxValue), expectedSchema = new StructType() - .add("part", IntegerType, nullable = true, metadata = typeWideningMetadata) + .add("part", IntegerType) .add("value", ShortType), expectedResult = Seq((Int.MaxValue, 4)).toDF("part", "value"), expectedJsonPredicate = Seq( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 1260c348f87..22f1c2ec3b9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -620,13 +620,13 @@ class DeltaLog private( } HadoopFsRelation( fileIndex, - partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata( - snapshot.metadata.partitionSchema), + partitionSchema = DeltaTableUtils.removeInternalDeltaMetadata( + spark, snapshot.metadata.partitionSchema), // We pass all table columns as `dataSchema` so that Spark will preserve the partition // column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would // just append them to the end of `dataSchema`. - dataSchema = DeltaColumnMapping.dropColumnMappingMetadata( - DeltaTableUtils.removeInternalWriterMetadata(spark, dataSchema) + dataSchema = DeltaTableUtils.removeInternalDeltaMetadata( + spark, DeltaTableUtils.removeInternalWriterMetadata(spark, dataSchema) ), bucketSpec = bucketSpec, fileFormat(snapshot.protocol, snapshot.metadata), diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index 4c7a41a3c9e..3f434cbdfd5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -645,6 +645,23 @@ object DeltaTableUtils extends PredicateHelper ) } + /** + * Removes internal Delta metadata from the given schema. This includes tyically metadata used by + * reader-writer table features that shouldn't leak outside of the table. Use + * [[removeInternalWriterMetadata]] in addition / instead to remove metadata for writer-only table + * features. + */ + def removeInternalDeltaMetadata(spark: SparkSession, schema: StructType): StructType = { + val cleanedSchema = DeltaColumnMapping.dropColumnMappingMetadata(schema) + + val conf = spark.sessionState.conf + if (conf.getConf(DeltaSQLConf.DELTA_TYPE_WIDENING_REMOVE_SCHEMA_METADATA)) { + TypeWideningMetadata.removeTypeWideningMetadata(cleanedSchema)._1 + } else { + cleanedSchema + } + } + } sealed abstract class UnresolvedPathBasedDeltaTableBase(path: String) extends UnresolvedLeafNode { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index a4bb58700e2..c6081855870 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -190,7 +190,7 @@ case class DeltaTableV2( val baseSchema = cdcRelation.map(_.schema).getOrElse { DeltaTableUtils.removeInternalWriterMetadata(spark, initialSnapshot.schema) } - DeltaColumnMapping.dropColumnMappingMetadata(baseSchema) + DeltaTableUtils.removeInternalDeltaMetadata(spark, baseSchema) } override def schema(): StructType = tableSchema diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 7487c717545..73ae9c1d60a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils import org.apache.spark.sql.delta._ -import org.apache.spark.sql.delta.DeltaColumnMapping.{dropColumnMappingMetadata, filterColumnMappingProperties} +import org.apache.spark.sql.delta.DeltaColumnMapping.filterColumnMappingProperties import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol, TableFeatureProtocolUtils} import org.apache.spark.sql.delta.actions.DomainMetadata import org.apache.spark.sql.delta.commands.DMLUtils.TaggedCommitData @@ -398,7 +398,7 @@ case class CreateDeltaTableCommand( ClusteredTableUtils.getDomainMetadataFromTransaction( ClusteredTableUtils.getClusterBySpecOptional(table), txn).toSeq } else { - verifyTableMetadata(txn, tableWithLocation) + verifyTableMetadata(sparkSession, txn, tableWithLocation) Nil } } @@ -539,6 +539,7 @@ case class CreateDeltaTableCommand( * table. */ private def verifyTableMetadata( + sparkSession: SparkSession, txn: OptimisticTransaction, tableDesc: CatalogTable): Unit = { val existingMetadata = txn.metadata @@ -554,7 +555,7 @@ case class CreateDeltaTableCommand( // However, if in column mapping mode, we can safely ignore the related metadata fields in // existing metadata because new table desc will not have related metadata assigned yet val differences = SchemaUtils.reportDifferences( - dropColumnMappingMetadata(existingMetadata.schema), + DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, existingMetadata.schema), tableDesc.schema) if (differences.nonEmpty) { throw DeltaErrors.createTableWithDifferentSchemaException( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index f684714c39f..42bc912e31c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -1134,7 +1134,7 @@ case class AlterTableSetLocationDeltaCommand( val bypassSchemaCheck = sparkSession.sessionState.conf.getConf( DeltaSQLConf.DELTA_ALTER_LOCATION_BYPASS_SCHEMA_CHECK) - if (!bypassSchemaCheck && !schemasEqual(oldMetadata, newMetadata)) { + if (!bypassSchemaCheck && !schemasEqual(sparkSession, oldMetadata, newMetadata)) { throw DeltaErrors.alterTableSetLocationSchemaMismatchException( oldMetadata.schema, newMetadata.schema) } @@ -1144,12 +1144,12 @@ case class AlterTableSetLocationDeltaCommand( } private def schemasEqual( + sparkSession: SparkSession, oldMetadata: actions.Metadata, newMetadata: actions.Metadata): Boolean = { - import DeltaColumnMapping._ - dropColumnMappingMetadata(oldMetadata.schema) == - dropColumnMappingMetadata(newMetadata.schema) && - dropColumnMappingMetadata(oldMetadata.partitionSchema) == - dropColumnMappingMetadata(newMetadata.partitionSchema) + DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, oldMetadata.schema) == + DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, newMetadata.schema) && + DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, oldMetadata.partitionSchema) == + DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, newMetadata.partitionSchema) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index dab3b4411b8..c14ca8122d9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -110,7 +110,8 @@ class DeltaDataSource throw DeltaErrors.specifySchemaAtReadTimeException } - val schemaToUse = DeltaColumnMapping.dropColumnMappingMetadata( + val schemaToUse = DeltaTableUtils.removeInternalDeltaMetadata( + sqlContext.sparkSession, DeltaTableUtils.removeInternalWriterMetadata(sqlContext.sparkSession, readSchema) ) if (schemaToUse.isEmpty) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index cdb46f48692..d23afc1ee79 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1295,6 +1295,14 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val DELTA_TYPE_WIDENING_REMOVE_SCHEMA_METADATA = + buildConf("typeWidening.removeSchemaMetadata") + .doc("When true, type widening metadata is removed from schemas that are surfaced outside " + + "of Delta or used for schema comparisons") + .internal() + .booleanConf + .createWithDefault(true) + val DELTA_IS_DELTA_TABLE_THROW_ON_ERROR = buildConf("isDeltaTable.throwOnError") .internal() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index c581aea641a..f52e4279ae8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -230,7 +230,7 @@ trait DeltaSourceBase extends Source } else { readSchema } - DeltaColumnMapping.dropColumnMappingMetadata(readSchemaWithCdc) + DeltaTableUtils.removeInternalDeltaMetadata(spark, readSchemaWithCdc) } // A dummy empty dataframe that can be returned at various point during streaming diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningAlterTableNestedSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningAlterTableNestedSuite.scala index 3ca4c95e532..106425efeed 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningAlterTableNestedSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningAlterTableNestedSuite.scala @@ -107,34 +107,9 @@ trait TypeWideningAlterTableNestedTests { assert(readDeltaTable(tempPath).schema === new StructType() .add("s", new StructType() - .add("a", ShortType, nullable = true, metadata = new MetadataBuilder() - .putMetadataArray("delta.typeChanges", Array( - new MetadataBuilder() - .putString("toType", "short") - .putString("fromType", "byte") - .build() - )).build())) - .add("m", MapType(IntegerType, IntegerType), nullable = true, metadata = new MetadataBuilder() - .putMetadataArray("delta.typeChanges", Array( - new MetadataBuilder() - .putString("toType", "integer") - .putString("fromType", "byte") - .putString("fieldPath", "key") - .build(), - new MetadataBuilder() - .putString("toType", "integer") - .putString("fromType", "short") - .putString("fieldPath", "value") - .build() - )).build()) - .add("a", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder() - .putMetadataArray("delta.typeChanges", Array( - new MetadataBuilder() - .putString("toType", "integer") - .putString("fromType", "short") - .putString("fieldPath", "element") - .build() - )).build())) + .add("a", ShortType)) + .add("m", MapType(IntegerType, IntegerType)) + .add("a", ArrayType(IntegerType))) append(Seq((5, 6, 7, 8)) .toDF("a", "b", "c", "d") @@ -153,34 +128,9 @@ trait TypeWideningAlterTableNestedTests { "(s struct, m map, a array)") assert(readDeltaTable(tempPath).schema === new StructType() .add("s", new StructType() - .add("a", ShortType, nullable = true, metadata = new MetadataBuilder() - .putMetadataArray("delta.typeChanges", Array( - new MetadataBuilder() - .putString("toType", "short") - .putString("fromType", "byte") - .build() - )).build())) - .add("m", MapType(IntegerType, IntegerType), nullable = true, metadata = new MetadataBuilder() - .putMetadataArray("delta.typeChanges", Array( - new MetadataBuilder() - .putString("toType", "integer") - .putString("fromType", "byte") - .putString("fieldPath", "key") - .build(), - new MetadataBuilder() - .putString("toType", "integer") - .putString("fromType", "short") - .putString("fieldPath", "value") - .build() - )).build()) - .add("a", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder() - .putMetadataArray("delta.typeChanges", Array( - new MetadataBuilder() - .putString("toType", "integer") - .putString("fromType", "short") - .putString("fieldPath", "element") - .build() - )).build())) + .add("a", ShortType)) + .add("m", MapType(IntegerType, IntegerType)) + .add("a", ArrayType(IntegerType))) append(Seq((5, 6, 7, 8)) .toDF("a", "b", "c", "d") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningAlterTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningAlterTableSuite.scala index 74b70ab95f2..b886c6cf95c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningAlterTableSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningAlterTableSuite.scala @@ -135,15 +135,7 @@ trait TypeWideningAlterTableTests append(Seq(1, 2).toDF("value").select($"value".cast(ShortType))) assert(readDeltaTable(tempPath).schema === new StructType().add("value", ShortType)) sql(s"ALTER TABLE delta.`$tempPath` REPLACE COLUMNS (value INT)") - assert(readDeltaTable(tempPath).schema === - new StructType() - .add("value", IntegerType, nullable = true, metadata = new MetadataBuilder() - .putMetadataArray("delta.typeChanges", Array( - new MetadataBuilder() - .putString("toType", "integer") - .putString("fromType", "short") - .build() - )).build())) + assert(readDeltaTable(tempPath).schema === new StructType().add("value", IntegerType)) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2))) append(Seq(3, 4).toDF("value")) checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3), Row(4))) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningFeatureCompatibilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningFeatureCompatibilitySuite.scala index e5969efda01..28023eeb581 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningFeatureCompatibilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningFeatureCompatibilitySuite.scala @@ -130,8 +130,7 @@ trait TypeWideningCompatibilityTests { append(Seq((2.toShort, "ghi", "jkl")).toDF("a", "c", "v")) assert(readDeltaTable(tempPath).schema === new StructType() - .add("a", ShortType, nullable = true, - metadata = typeWideningMetadata(ByteType, ShortType)) + .add("a", ShortType) .add("c", StringType, nullable = true, metadata = new MetadataBuilder() .putString("__CHAR_VARCHAR_TYPE_STRING", "char(3)") @@ -148,8 +147,7 @@ trait TypeWideningCompatibilityTests { append(Seq((3.toShort, "longer string 1", "longer string 2")).toDF("a", "c", "v")) assert(readDeltaTable(tempPath).schema === new StructType() - .add("a", ShortType, nullable = true, - metadata = typeWideningMetadata(ByteType, ShortType)) + .add("a", ShortType) .add("c", StringType) .add("v", StringType)) checkAnswer(readDeltaTable(tempPath), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala index 89b49164819..96b3305b74f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala @@ -185,8 +185,7 @@ trait TypeWideningInsertSchemaEvolutionTests insertData = TestData("a int, b int", Seq("""{ "a": 1, "b": 4 }""")), expectedResult = ExpectedResult.Success(new StructType() .add("a", IntegerType) - .add("b", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType))) + .add("b", IntegerType)) ) testInserts("top-level type evolution with column upcast")( @@ -196,8 +195,7 @@ trait TypeWideningInsertSchemaEvolutionTests insertData = TestData("a int, b int, c short", Seq("""{ "a": 1, "b": 5, "c": 6 }""")), expectedResult = ExpectedResult.Success(new StructType() .add("a", IntegerType) - .add("b", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)) + .add("b", IntegerType) .add("c", IntegerType)) ) @@ -208,8 +206,7 @@ trait TypeWideningInsertSchemaEvolutionTests insertData = TestData("a int, b int, c int", Seq("""{ "a": 1, "b": 4, "c": 5 }""")), expectedResult = ExpectedResult.Success(new StructType() .add("a", IntegerType) - .add("b", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)) + .add("b", IntegerType) .add("c", IntegerType)), // SQL INSERT by name doesn't support schema evolution. excludeInserts = insertsSQL.intersect(insertsByName) @@ -229,18 +226,9 @@ trait TypeWideningInsertSchemaEvolutionTests .add("key", IntegerType) .add("s", new StructType() .add("x", ShortType) - .add("y", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType))) - .add("m", MapType(StringType, IntegerType), nullable = true, - metadata = typeWideningMetadata( - from = ShortType, - to = IntegerType, - path = Seq("value"))) - .add("a", ArrayType(IntegerType), nullable = true, - metadata = typeWideningMetadata( - from = ShortType, - to = IntegerType, - path = Seq("element")))) + .add("y", IntegerType)) + .add("m", MapType(StringType, IntegerType)) + .add("a", ArrayType(IntegerType))) ) @@ -257,19 +245,10 @@ trait TypeWideningInsertSchemaEvolutionTests .add("key", IntegerType) .add("s", new StructType() .add("x", ShortType) - .add("y", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)) + .add("y", IntegerType) .add("z", IntegerType)) - .add("m", MapType(StringType, IntegerType), nullable = true, - metadata = typeWideningMetadata( - from = ShortType, - to = IntegerType, - path = Seq("value"))) - .add("a", ArrayType(IntegerType), nullable = true, - metadata = typeWideningMetadata( - from = ShortType, - to = IntegerType, - path = Seq("element")))) + .add("m", MapType(StringType, IntegerType)) + .add("a", ArrayType(IntegerType))) ) @@ -286,8 +265,7 @@ trait TypeWideningInsertSchemaEvolutionTests .add("key", IntegerType) .add("s", new StructType() .add("x", IntegerType) - .add("y", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)))) + .add("y", IntegerType))) ) // Interestingly, we introduced a special case to handle schema evolution / casting for structs @@ -306,8 +284,7 @@ trait TypeWideningInsertSchemaEvolutionTests .add("key", IntegerType) .add("a", ArrayType(new StructType() .add("x", IntegerType) - .add("y", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType))))) + .add("y", IntegerType)))) ) // maps now allow type evolution for INSERT by position and name in SQL and dataframe. @@ -325,7 +302,6 @@ trait TypeWideningInsertSchemaEvolutionTests // Type evolution was applied in the map. .add("m", MapType(StringType, new StructType() .add("x", IntegerType) - .add("y", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType))))) + .add("y", IntegerType)))) ) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMergeIntoSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMergeIntoSchemaEvolutionSuite.scala index cd83b911a0f..bd0ab0225ac 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMergeIntoSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMergeIntoSchemaEvolutionSuite.scala @@ -190,9 +190,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests cond = "t.a = s.a", clauses = update("a = s.a + 1") :: Nil, result = Seq("""{ "a": 1 }""", """{ "a": 10 }"""), - resultSchema = new StructType() - .add("a", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)) + resultSchema = new StructType().add("a", IntegerType) ) testTypeEvolution("change top-level column short -> int with insert")( @@ -203,9 +201,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests cond = "t.a = s.a", clauses = insert("(a) VALUES (s.a)") :: Nil, result = Seq("""{ "a": 0 }""", """{ "a": 10 }""", """{ "a": 20 }"""), - resultSchema = new StructType() - .add("a", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)) + resultSchema = new StructType().add("a", IntegerType) ) testTypeEvolution("updating using narrower value doesn't evolve schema")( @@ -233,8 +229,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests result = Seq( """{ "a": 1, "b": 5 }""", """{ "a": 10, "b": 15 }"""), resultSchema = new StructType() - .add("a", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)) + .add("a", IntegerType) .add("b", ShortType) ) @@ -252,8 +247,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests result = Seq("""{ "s": { "a": 2 } }""", """{ "s": { "a": 10 } }"""), resultSchema = new StructType() .add("s", new StructType() - .add("a", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType))) + .add("a", IntegerType)) ) testTypeEvolution("automatic widening of struct field with field assignment")( @@ -270,8 +264,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests result = Seq("""{ "s": { "a": 2 } }""", """{ "s": { "a": 10 } }"""), resultSchema = new StructType() .add("s", new StructType() - .add("a", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType))) + .add("a", IntegerType)) ) testTypeEvolution("automatic widening of map value")( @@ -286,13 +279,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests clauses = update("t.m = s.m") :: Nil, result = Seq("""{ "m": { "a": 2 } }"""), resultSchema = new StructType() - .add("m", - MapType(StringType, IntegerType), - nullable = true, - metadata = typeWideningMetadata( - from = ShortType, - to = IntegerType, - path = Seq("value"))) + .add("m", MapType(StringType, IntegerType)) ) testTypeEvolution("automatic widening of array element")( @@ -306,13 +293,7 @@ trait TypeWideningMergeIntoSchemaEvolutionTests clauses = update("t.a = s.a") :: Nil, result = Seq("""{ "a": [3, 4] }"""), resultSchema = new StructType() - .add("a", - ArrayType(IntegerType), - nullable = true, - metadata = typeWideningMetadata( - from = ShortType, - to = IntegerType, - path = Seq("element"))) + .add("a", ArrayType(IntegerType)) ) testTypeEvolution("multiple automatic widening")( @@ -328,10 +309,8 @@ trait TypeWideningMergeIntoSchemaEvolutionTests clauses = update("*") :: insert("*") :: Nil, result = Seq("""{ "a": 1, "b": 4 }""", """{ "a": 5, "b": 6 }"""), resultSchema = new StructType() - .add("a", ShortType, nullable = true, - metadata = typeWideningMetadata(from = ByteType, to = ShortType)) - .add("b", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)) + .add("a", ShortType) + .add("b", IntegerType) ) for (enabled <- BOOLEAN_DOMAIN) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala index 26ae425fa18..d28f8bc4c80 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala @@ -16,12 +16,14 @@ package org.apache.spark.sql.delta.typewidening +import java.io.File + import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.propertyKey import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.JsonUtils -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.types._ @@ -33,6 +35,7 @@ class TypeWideningMetadataSuite with TypeWideningTestMixin with TypeWideningMetadataTests with TypeWideningMetadataEndToEndTests + with TypeWideningLeakingMetadataTests /** * Tests covering the [[TypeWideningMetadata]] and [[TypeChange]] classes used to handle the @@ -626,12 +629,7 @@ trait TypeWideningMetadataEndToEndTests { "name": "a", "type": "integer", "nullable": true, - "metadata": { - "delta.typeChanges": [{ - "toType": "integer", - "fromType": "short" - }] - } + "metadata": {} }]}""".stripMargin) testTypeWideningMetadata("change top-level column type twice byte->short->int")( @@ -644,15 +642,7 @@ trait TypeWideningMetadataEndToEndTests { "name": "a", "type": "integer", "nullable": true, - "metadata": { - "delta.typeChanges": [{ - "toType": "short", - "fromType": "byte" - },{ - "toType": "integer", - "fromType": "short" - }] - } + "metadata": {} }]}""".stripMargin) testTypeWideningMetadata("change type in map key and in struct in map value")( @@ -672,24 +662,13 @@ trait TypeWideningMetadataEndToEndTests { "name": "b", "type": "short", "nullable": true, - "metadata": { - "delta.typeChanges": [{ - "toType": "short", - "fromType": "byte" - }] - } + "metadata": {} }] }, "valueContainsNull": true }, "nullable": true, - "metadata": { - "delta.typeChanges": [{ - "toType": "integer", - "fromType": "byte", - "fieldPath": "key" - }] - } + "metadata": {} } ]}""".stripMargin) @@ -708,13 +687,7 @@ trait TypeWideningMetadataEndToEndTests { "containsNull": true }, "nullable": true, - "metadata": { - "delta.typeChanges": [{ - "toType": "short", - "fromType": "byte", - "fieldPath": "element" - }] - } + "metadata": {} }, { "name": "b", @@ -726,12 +699,7 @@ trait TypeWideningMetadataEndToEndTests { "name": "c", "type": "integer", "nullable": true, - "metadata": { - "delta.typeChanges": [{ - "toType": "integer", - "fromType": "short" - }] - } + "metadata": {} }] }, "containsNull": true @@ -741,3 +709,41 @@ trait TypeWideningMetadataEndToEndTests { } ]}""".stripMargin) } + + +trait TypeWideningLeakingMetadataTests { + self: QueryTest with TypeWideningTestMixin => + + test("stream read from type widening does not leak metadata") { + val (t1, t2) = ("type_widening_table_1", "type_widening_table_2") + withTable(t1, t2) { + withTempDir { dir => + sql(s"CREATE TABLE $t1 (part BYTE, value SHORT) USING DELTA PARTITIONED BY (part)") + sql(s"INSERT INTO $t1 VALUES (1, 1), (2, 2)") + // Change type of both partition and non-partition columns. + sql(s"ALTER TABLE $t1 CHANGE COLUMN part TYPE INT") + sql(s"ALTER TABLE $t1 CHANGE COLUMN value TYPE INT") + // Stream read from source table + val streamDf = spark.readStream.format("delta").table(t1) + // Should not contain type widening metadata + assert(streamDf.schema.forall(_.metadata.json == "{}")) + + // Create and write to another table + val q = streamDf.writeStream + .partitionBy("part") + .trigger(org.apache.spark.sql.streaming.Trigger.AvailableNow()) + .format("delta") + .option("checkpointLocation", new File(dir, "_checkpoint1").getCanonicalPath) + .toTable(t2) + q.awaitTermination() + + // Check target table Delta log + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t2)) + assert(deltaLog.update().metadata.schema.forall(_.metadata.json == "{}")) + + // Check target table data + checkAnswer(spark.table(t2), Seq(Row(1, 1), Row(2, 2))) + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSinkSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSinkSuite.scala index 645add2a388..ac209910098 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSinkSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningStreamingSinkSuite.scala @@ -143,8 +143,7 @@ class TypeWideningStreamingSinkSuite stream.write((12, 3456))("CAST(_1 AS INT) AS a", "CAST(_2 AS DECIMAL(10, 2)) AS b") assert(stream.currentSchema === new StructType() - .add("a", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)) + .add("a", IntegerType) .add("b", DecimalType(10, 2))) checkAnswer(stream.read(), Row(17, null) :: Row(12, 3456) :: Nil) } @@ -160,8 +159,7 @@ class TypeWideningStreamingSinkSuite stream.write((12, -1))("CAST(_1 AS INT) AS a") assert(stream.currentSchema === new StructType() - .add("a", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType)) + .add("a", IntegerType) .add("b", LongType)) checkAnswer(stream.read(), Row(17, 45) :: Row(12, null) :: Nil) } @@ -188,8 +186,7 @@ class TypeWideningStreamingSinkSuite assert(stream.currentSchema === new StructType().add("c", ShortType)) stream.write((12, -1))("CAST(_1 AS INT) AS c") - assert(stream.currentSchema === new StructType().add("c", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType))) + assert(stream.currentSchema === new StructType().add("c", IntegerType)) checkAnswer(stream.read(), Row(17) :: Row(12) :: Nil) } } @@ -220,8 +217,7 @@ class TypeWideningStreamingSinkSuite val data = Seq(2, 3).toDF("value").selectExpr("CAST(value AS INT)") sink.addBatch(1, data) val df = spark.read.format("delta").load(tablePath) - assert(df.schema === new StructType().add("value", IntegerType, nullable = true, - metadata = typeWideningMetadata(from = ShortType, to = IntegerType))) + assert(df.schema === new StructType().add("value", IntegerType)) checkAnswer(df, Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala index e263f878cb6..878fca32811 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala @@ -826,7 +826,7 @@ trait TypeWideningTableFeatureTests addSingleFile(Seq(1), ByteType) sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE short") - assert(readDeltaTable(tempPath).schema === new StructType() + assert(deltaLog.update().metadata.schema === new StructType() .add("a", ShortType, nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( new MetadataBuilder() @@ -840,7 +840,7 @@ trait TypeWideningTableFeatureTests // specs are compatible. In that case, we still populate the `tableVersion` field. addTableFeature(tempPath, TypeWideningTableFeature) sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") - assert(readDeltaTable(tempPath).schema === new StructType() + assert(deltaLog.update().metadata.schema === new StructType() .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( new MetadataBuilder() @@ -864,7 +864,7 @@ trait TypeWideningTableFeatureTests enableTypeWidening(tempPath) addSingleFile(Seq(1), ByteType) sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE short") - assert(readDeltaTable(tempPath).schema === new StructType() + assert(deltaLog.update().metadata.schema === new StructType() .add("a", ShortType, nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( new MetadataBuilder()