From 199c1a2a62d1774cd799128c48d995b8673e1ba5 Mon Sep 17 00:00:00 2001 From: Anoop Johnson Date: Sun, 2 Mar 2025 07:51:45 -0800 Subject: [PATCH] [Kernel] New tests for Timestamp_NTZ writes Adds new unit tests for timestampNtz writes. Note that most of the write functionality (creation, inserts, insert with partition columns) are already exercised by DeltaTableWritesSuite. This PR adds a few more tests. --- .../kernel/internal/actions/Protocol.java | 19 ++++++ .../internal/actions/ProtocolSuite.scala | 20 ++++++ .../defaults/DeltaTableFeaturesSuite.scala | 67 ++++++++++++++++++- .../defaults/DeltaTableWriteSuiteBase.scala | 6 +- 4 files changed, 106 insertions(+), 6 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java index 1e08823ca6f..29855c3393d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java @@ -45,6 +45,25 @@ public class Protocol { /// Public static variables and methods /// ///////////////////////////////////////////////////////////////////////////////////////////////// + /** + * Helper method to get the Protocol from the row representation. + * + * @param row Row representation of the Protocol. + * @return the Protocol object + */ + public static Protocol fromRow(Row row) { + requireNonNull(row); + Set readerFeatures = + row.isNullAt(2) + ? Collections.emptySet() + : Collections.unmodifiableSet(new HashSet<>(VectorUtils.toJavaList(row.getArray(2)))); + Set writerFeatures = + row.isNullAt(3) + ? Collections.emptySet() + : Collections.unmodifiableSet(new HashSet<>(VectorUtils.toJavaList(row.getArray(3)))); + return new Protocol(row.getInt(0), row.getInt(1), readerFeatures, writerFeatures); + } + public static Protocol fromColumnVector(ColumnVector vector, int rowId) { if (vector.isNullAt(rowId)) { return null; diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/actions/ProtocolSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/actions/ProtocolSuite.scala index 3c9768f091b..3155526f40b 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/actions/ProtocolSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/actions/ProtocolSuite.scala @@ -17,7 +17,10 @@ package io.delta.kernel.internal.actions import scala.collection.JavaConverters._ +import io.delta.kernel.internal.data.GenericRow import io.delta.kernel.internal.tablefeatures.TableFeatures +import io.delta.kernel.internal.util.VectorUtils +import io.delta.kernel.types.{ArrayType, IntegerType, StringType, StructType} import org.scalatest.funsuite.AnyFunSuite @@ -369,4 +372,21 @@ class ProtocolSuite extends AnyFunSuite { assert(merged.getWriterFeatures.asScala === expWriterFeatures) } }) + + test("extract protocol from the row representation") { + val ordinalToValue: Map[Integer, Object] = Map( + Integer.valueOf(0) -> Integer.valueOf(42), + Integer.valueOf(1) -> Integer.valueOf(43), + Integer.valueOf(2) -> VectorUtils.stringArrayValue(List("foo").asJava).asInstanceOf[Object], + Integer.valueOf(3) -> VectorUtils.stringArrayValue(List("bar").asJava).asInstanceOf[Object]) + val row = new GenericRow( + new StructType().add("minReaderVersion", IntegerType.INTEGER) + .add("minWriterVersion", IntegerType.INTEGER) + .add("readerFeatures", new ArrayType(StringType.STRING, true)) + .add("writerFeatures", new ArrayType(StringType.STRING, true)), + ordinalToValue.asJava) + + val expected = new Protocol(42, 43, Set("foo").asJava, Set("bar").asJava) + assert(Protocol.fromRow(row) === expected) + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableFeaturesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableFeaturesSuite.scala index 25ec6cf3d4b..e397c45aa4b 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableFeaturesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableFeaturesSuite.scala @@ -15,12 +15,20 @@ */ package io.delta.kernel.defaults +import java.util.Collections + import scala.collection.immutable.Seq +import scala.jdk.CollectionConverters._ -import io.delta.kernel.engine.Engine +import io.delta.kernel.Operation.CREATE_TABLE +import io.delta.kernel.Table import io.delta.kernel.expressions.Literal +import io.delta.kernel.internal.actions.{Protocol => KernelProtocol} +import io.delta.kernel.types.{StructType, TimestampNTZType} +import io.delta.kernel.types.IntegerType.INTEGER +import io.delta.kernel.utils.CloseableIterable.emptyIterable -import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.{DeltaLog, DeltaTableFeatureException} import org.apache.spark.sql.delta.actions.Protocol /** @@ -132,6 +140,61 @@ class DeltaTableFeaturesSuite extends DeltaTableWriteSuiteBase { } } + // Test format: isTimestampNtzEnabled, expected protocol. + Seq( + (true, new KernelProtocol(3, 7, Set("timestampNtz").asJava, Set("timestampNtz").asJava)), + (false, new KernelProtocol(1, 2, Collections.emptySet(), Collections.emptySet()))) + .foreach({ + case (isTimestampNtzEnabled, expectedProtocol) => + test(s"Create table with timestampNtz enabled: $isTimestampNtzEnabled") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE) + + val schema = if (isTimestampNtzEnabled) { + new StructType().add("tz", TimestampNTZType.TIMESTAMP_NTZ) + } else { + new StructType().add("id", INTEGER) + } + val txn = txnBuilder + .withSchema(engine, schema) + .build(engine) + + assert(txn.getSchema(engine) === schema) + assert(txn.getPartitionColumns(engine).isEmpty) + val txnResult = commitTransaction(txn, engine, emptyIterable()) + + assert(txnResult.getVersion === 0) + val protocolRow = getProtocolActionFromCommit(engine, table, 0) + assert(protocolRow.isDefined) + val protocol = KernelProtocol.fromRow(protocolRow.get) + assert(protocol.getMinReaderVersion === expectedProtocol.getMinReaderVersion) + assert(protocol.getMinWriterVersion === expectedProtocol.getMinWriterVersion) + assert(protocol.getReaderFeatures.containsAll(expectedProtocol.getReaderFeatures)) + assert(protocol.getWriterFeatures.containsAll(expectedProtocol.getWriterFeatures)) + } + } + }) + + test("schema evolution from Spark to add TIMESTAMP_NTZ type on a table created with kernel") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE) + val txn = txnBuilder + .withSchema(engine, testSchema) + .build(engine) + val txnResult = commitTransaction(txn, engine, emptyIterable()) + + assert(txnResult.getVersion === 0) + assertThrows[DeltaTableFeatureException] { + spark.sql("ALTER TABLE delta.`" + tablePath + "` ADD COLUMN newCol TIMESTAMP_NTZ") + } + spark.sql("ALTER TABLE delta.`" + tablePath + + "` SET TBLPROPERTIES ('delta.feature.timestampNtz' = 'supported')") + spark.sql("ALTER TABLE delta.`" + tablePath + "` ADD COLUMN newCol TIMESTAMP_NTZ") + } + } + /////////////////////////////////////////////////////////////////////////// // Helper methods /////////////////////////////////////////////////////////////////////////// diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index eabca9523b8..85d6e01e94b 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -26,7 +26,6 @@ import io.delta.golden.GoldenTableUtils.goldenTablePath import io.delta.kernel.{Meta, Operation, Table, Transaction, TransactionBuilder, TransactionCommitResult} import io.delta.kernel.Operation.CREATE_TABLE import io.delta.kernel.data.{ColumnarBatch, ColumnVector, FilteredColumnarBatch, Row} -import io.delta.kernel.defaults.engine.DefaultEngine import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch import io.delta.kernel.defaults.utils.{TestRow, TestUtils} import io.delta.kernel.engine.Engine @@ -34,10 +33,9 @@ import io.delta.kernel.expressions.Literal import io.delta.kernel.expressions.Literal.ofInt import io.delta.kernel.hook.PostCommitHook.PostCommitHookType import io.delta.kernel.internal.{SnapshotImpl, TableConfig, TableImpl} -import io.delta.kernel.internal.actions.{Metadata, Protocol, SingleAction} +import io.delta.kernel.internal.actions.SingleAction import io.delta.kernel.internal.fs.{Path => DeltaPath} -import io.delta.kernel.internal.util.Clock -import io.delta.kernel.internal.util.FileNames +import io.delta.kernel.internal.util.{Clock, FileNames, VectorUtils} import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames import io.delta.kernel.internal.util.Utils.singletonCloseableIterator import io.delta.kernel.internal.util.Utils.toCloseableIterator