diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java index 173168ff1a0..50162b065fd 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java @@ -33,7 +33,14 @@ enum PostCommitHookType { * perform this operation, reading previous checkpoint + logs is required to construct a new * checkpoint, with latency scaling based on log size (typically seconds to minutes). */ - CHECKPOINT + CHECKPOINT, + /** + * Writes a checksum file at the version committed by the transaction. This hook is present when + * all required table statistics (e.g. table size) for checksum file are known when a + * transaction commits. This operation has a minimal latency with no requirement of reading + * previous checkpoint or logs. + */ + CHECKSUM_SIMPLE } /** Invokes the post commit operation whose implementation must be thread safe. */ diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index be0497d3e41..77b51352b29 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -25,10 +25,10 @@ import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.checksum.CRCInfo; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.metrics.SnapshotQueryContext; import io.delta.kernel.internal.metrics.SnapshotReportImpl; -import io.delta.kernel.internal.replay.CRCInfo; import io.delta.kernel.internal.replay.CreateCheckpointIterator; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 1dba7458002..e950126acb6 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -29,9 +29,11 @@ import io.delta.kernel.expressions.Column; import io.delta.kernel.hook.PostCommitHook; import io.delta.kernel.internal.actions.*; +import io.delta.kernel.internal.checksum.CRCInfo; import io.delta.kernel.internal.data.TransactionStateRow; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.hook.CheckpointHook; +import io.delta.kernel.internal.hook.ChecksumSimpleHook; import io.delta.kernel.internal.metrics.TransactionMetrics; import io.delta.kernel.internal.metrics.TransactionReportImpl; import io.delta.kernel.internal.replay.ConflictChecker; @@ -39,6 +41,7 @@ import io.delta.kernel.internal.rowtracking.RowTracking; import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.*; +import io.delta.kernel.metrics.TransactionMetricsResult; import io.delta.kernel.metrics.TransactionReport; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterable; @@ -342,6 +345,7 @@ private TransactionCommitResult doCommit( dataAndMetadataActions.map( action -> { transactionMetrics.totalActionsCounter.increment(); + // TODO: handle RemoveFiles. if (!action.isNullAt(ADD_FILE_ORDINAL)) { transactionMetrics.addFilesCounter.increment(); transactionMetrics.addFilesSizeInBytesCounter.increment( @@ -362,6 +366,10 @@ private TransactionCommitResult doCommit( postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion)); } + buildPostCommitCrcInfoIfCurrentCrcAvailable( + commitAsVersion, transactionMetrics.captureTransactionMetricsResult()) + .ifPresent(crcInfo -> postCommitHooks.add(new ChecksumSimpleHook(crcInfo, logPath))); + return new TransactionCommitResult(commitAsVersion, postCommitHooks); } catch (FileAlreadyExistsException e) { throw e; @@ -437,6 +445,36 @@ private void recordTransactionReport( engine.getMetricsReporters().forEach(reporter -> reporter.report(transactionReport)); } + private Optional buildPostCommitCrcInfoIfCurrentCrcAvailable( + long commitAtVersion, TransactionMetricsResult metricsResult) { + if (isNewTable) { + return Optional.of( + new CRCInfo( + commitAtVersion, + metadata, + protocol, + metricsResult.getTotalAddFilesSizeInBytes(), + metricsResult.getNumAddFiles(), + Optional.of(txnId.toString()))); + } + + return readSnapshot + .getCurrentCrcInfo() + // in the case of a conflicting txn and successful retry the readSnapshot may not be + // commitVersion - 1 + .filter(lastCrcInfo -> commitAtVersion == lastCrcInfo.getVersion() + 1) + .map( + lastCrcInfo -> + new CRCInfo( + commitAtVersion, + metadata, + protocol, + // TODO: handle RemoveFiles for calculating table size and num of files. + lastCrcInfo.getTableSizeBytes() + metricsResult.getTotalAddFilesSizeInBytes(), + lastCrcInfo.getNumFiles() + metricsResult.getNumAddFiles(), + Optional.of(txnId.toString()))); + } + /** * Get the part of the schema of the table that needs the statistics to be collected per file. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/CRCInfo.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/CRCInfo.java new file mode 100644 index 00000000000..b4d72dc89ad --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/CRCInfo.java @@ -0,0 +1,164 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.checksum; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.internal.util.InternalUtils; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CRCInfo { + private static final Logger logger = LoggerFactory.getLogger(CRCInfo.class); + + // Constants for schema field names + private static final String TABLE_SIZE_BYTES = "tableSizeBytes"; + private static final String NUM_FILES = "numFiles"; + private static final String NUM_METADATA = "numMetadata"; + private static final String NUM_PROTOCOL = "numProtocol"; + private static final String METADATA = "metadata"; + private static final String PROTOCOL = "protocol"; + private static final String TXN_ID = "txnId"; + + public static final StructType CRC_FILE_SCHEMA = + new StructType() + .add(TABLE_SIZE_BYTES, LongType.LONG) + .add(NUM_FILES, LongType.LONG) + .add(NUM_METADATA, LongType.LONG) + .add(NUM_PROTOCOL, LongType.LONG) + .add(METADATA, Metadata.FULL_SCHEMA) + .add(PROTOCOL, Protocol.FULL_SCHEMA) + .add(TXN_ID, StringType.STRING, /*nullable*/ true); + + public static Optional fromColumnarBatch( + long version, ColumnarBatch batch, int rowId, String crcFilePath) { + // Read required fields. + Protocol protocol = + Protocol.fromColumnVector(batch.getColumnVector(getSchemaIndex(PROTOCOL)), rowId); + Metadata metadata = + Metadata.fromColumnVector(batch.getColumnVector(getSchemaIndex(METADATA)), rowId); + long tableSizeBytes = + InternalUtils.requireNonNull( + batch.getColumnVector(getSchemaIndex(TABLE_SIZE_BYTES)), rowId, TABLE_SIZE_BYTES) + .getLong(rowId); + long numFiles = + InternalUtils.requireNonNull( + batch.getColumnVector(getSchemaIndex(NUM_FILES)), rowId, NUM_FILES) + .getLong(rowId); + + // Read optional fields + ColumnVector txnIdColumnVector = batch.getColumnVector(getSchemaIndex(TXN_ID)); + Optional txnId = + txnIdColumnVector.isNullAt(rowId) + ? Optional.empty() + : Optional.of(txnIdColumnVector.getString(rowId)); + + // protocol and metadata are nullable per fromColumnVector's implementation. + if (protocol == null || metadata == null) { + logger.warn("Invalid checksum file missing protocol and/or metadata: {}", crcFilePath); + return Optional.empty(); + } + return Optional.of(new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txnId)); + } + + private final long version; + private final Metadata metadata; + private final Protocol protocol; + private final long tableSizeBytes; + private final long numFiles; + private final Optional txnId; + + public CRCInfo( + long version, + Metadata metadata, + Protocol protocol, + long tableSizeBytes, + long numFiles, + Optional txnId) { + checkArgument(tableSizeBytes >= 0); + checkArgument(numFiles >= 0); + this.version = version; + this.metadata = requireNonNull(metadata); + this.protocol = requireNonNull(protocol); + this.tableSizeBytes = tableSizeBytes; + this.numFiles = numFiles; + this.txnId = requireNonNull(txnId); + } + + /** The version of the Delta table that this CRCInfo represents. */ + public long getVersion() { + return version; + } + + /** The {@link Metadata} stored in this CRCInfo. */ + public Metadata getMetadata() { + return metadata; + } + + /** The {@link Protocol} stored in this CRCInfo. */ + public Protocol getProtocol() { + return protocol; + } + + public long getNumFiles() { + return numFiles; + } + + public long getTableSizeBytes() { + return tableSizeBytes; + } + + public Optional getTxnId() { + return txnId; + } + + /** + * Encode as a {@link Row} object with the schema {@link CRCInfo#CRC_FILE_SCHEMA}. + * + * @return {@link Row} object with the schema {@link CRCInfo#CRC_FILE_SCHEMA} + */ + public Row toRow() { + Map values = new HashMap<>(); + // Add required fields + values.put(getSchemaIndex(TABLE_SIZE_BYTES), tableSizeBytes); + values.put(getSchemaIndex(NUM_FILES), numFiles); + values.put(getSchemaIndex(NUM_METADATA), 1L); + values.put(getSchemaIndex(NUM_PROTOCOL), 1L); + values.put(getSchemaIndex(METADATA), metadata.toRow()); + values.put(getSchemaIndex(PROTOCOL), protocol.toRow()); + + // Add optional fields + txnId.ifPresent(txn -> values.put(getSchemaIndex(TXN_ID), txn)); + return new GenericRow(CRC_FILE_SCHEMA, values); + } + + private static int getSchemaIndex(String fieldName) { + return CRC_FILE_SCHEMA.indexOf(fieldName); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumReader.java similarity index 98% rename from kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumReader.java index 314c456afe8..eeb651a34f5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumReader.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.delta.kernel.internal.replay; +package io.delta.kernel.internal.checksum; import static io.delta.kernel.internal.util.FileNames.*; import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; @@ -96,7 +96,7 @@ private static Optional readChecksumFile(Engine engine, Path filePath) .getJsonHandler() .readJsonFiles( singletonCloseableIterator(FileStatus.of(filePath.toString())), - CRCInfo.FULL_SCHEMA, + CRCInfo.CRC_FILE_SCHEMA, Optional.empty())) { // We do this instead of iterating through the rows or using `getSingularRow` so we // can use the existing fromColumnVector methods in Protocol, Metadata, Format etc diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java new file mode 100644 index 00000000000..caec865694a --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java @@ -0,0 +1,58 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.checksum; + +import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; +import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.FileNames; +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Writers for writing checksum files from a snapshot */ +public class ChecksumWriter { + + private static final Logger logger = LoggerFactory.getLogger(ChecksumWriter.class); + + private final Path logPath; + + public ChecksumWriter(Path logPath) { + this.logPath = requireNonNull(logPath); + } + + /** Writes a checksum file */ + public void writeCheckSum(Engine engine, CRCInfo crcInfo) throws IOException { + Path newChecksumPath = FileNames.checksumFile(logPath, crcInfo.getVersion()); + logger.info("Writing checksum file to path: {}", newChecksumPath); + wrapEngineExceptionThrowsIO( + () -> { + engine + .getJsonHandler() + .writeJsonFileAtomically( + newChecksumPath.toString(), + singletonCloseableIterator(crcInfo.toRow()), + false /* overwrite */); + logger.info("Write checksum file `{}` succeeds", newChecksumPath); + return null; + }, + "Write checksum file `%s`", + newChecksumPath); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/ChecksumSimpleHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/ChecksumSimpleHook.java new file mode 100644 index 00000000000..bb4d0e8c19f --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/ChecksumSimpleHook.java @@ -0,0 +1,53 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.hook; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.engine.Engine; +import io.delta.kernel.hook.PostCommitHook; +import io.delta.kernel.internal.checksum.CRCInfo; +import io.delta.kernel.internal.checksum.ChecksumWriter; +import io.delta.kernel.internal.fs.Path; +import java.io.IOException; + +/** + * A post-commit hook that writes a new checksum file at the version committed by the transaction. + * This hook performs a simple checksum operation without requiring previous checkpoint or log + * reading. + */ +public class ChecksumSimpleHook implements PostCommitHook { + + private final CRCInfo crcInfo; + private final Path logPath; + + public ChecksumSimpleHook(CRCInfo crcInfo, Path logPath) { + this.crcInfo = requireNonNull(crcInfo); + this.logPath = requireNonNull(logPath); + } + + @Override + public void threadSafeInvoke(Engine engine) throws IOException { + checkArgument(engine != null); + new ChecksumWriter(logPath).writeCheckSum(engine, crcInfo); + } + + @Override + public PostCommitHookType getType() { + return PostCommitHookType.CHECKSUM_SIMPLE; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java deleted file mode 100644 index 85b1094f477..00000000000 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (2025) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.internal.replay; - -import static java.util.Objects.requireNonNull; - -import io.delta.kernel.data.ColumnarBatch; -import io.delta.kernel.internal.actions.Metadata; -import io.delta.kernel.internal.actions.Protocol; -import io.delta.kernel.types.StructType; -import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CRCInfo { - private static final Logger logger = LoggerFactory.getLogger(CRCInfo.class); - - public static Optional fromColumnarBatch( - long version, ColumnarBatch batch, int rowId, String crcFilePath) { - Protocol protocol = Protocol.fromColumnVector(batch.getColumnVector(PROTOCOL_ORDINAL), rowId); - Metadata metadata = Metadata.fromColumnVector(batch.getColumnVector(METADATA_ORDINAL), rowId); - // protocol and metadata are nullable per fromColumnVector's implementation. - if (protocol == null || metadata == null) { - logger.warn("Invalid checksum file missing protocol and/or metadata: {}", crcFilePath); - return Optional.empty(); - } - return Optional.of(new CRCInfo(version, metadata, protocol)); - } - - // We can add additional fields later - public static final StructType FULL_SCHEMA = - new StructType().add("protocol", Protocol.FULL_SCHEMA).add("metadata", Metadata.FULL_SCHEMA); - - private static final int PROTOCOL_ORDINAL = 0; - private static final int METADATA_ORDINAL = 1; - - private final long version; - private final Metadata metadata; - private final Protocol protocol; - - protected CRCInfo(long version, Metadata metadata, Protocol protocol) { - this.version = version; - this.metadata = requireNonNull(metadata); - this.protocol = requireNonNull(protocol); - } - - /** The version of the Delta table that this CRCInfo represents. */ - public long getVersion() { - return version; - } - - /** The {@link Metadata} stored in this CRCInfo. */ - public Metadata getMetadata() { - return metadata; - } - - /** The {@link Protocol} stored in this CRCInfo. */ - public Protocol getProtocol() { - return protocol; - } -} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 97cbf3ad637..ab4942185d8 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -28,6 +28,8 @@ import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.checkpoints.SidecarFile; +import io.delta.kernel.internal.checksum.CRCInfo; +import io.delta.kernel.internal.checksum.ChecksumReader; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.metrics.ScanMetrics; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotHint.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotHint.java index 5423d786b3f..76b9f653e21 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotHint.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotHint.java @@ -18,7 +18,7 @@ import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; -import io.delta.kernel.internal.replay.CRCInfo; +import io.delta.kernel.internal.checksum.CRCInfo; /** Contains summary information of a {@link io.delta.kernel.Snapshot}. */ public class SnapshotHint { diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checksum/ChecksumWriterSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checksum/ChecksumWriterSuite.scala new file mode 100644 index 00000000000..1e73f99de19 --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checksum/ChecksumWriterSuite.scala @@ -0,0 +1,197 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.checksum + +import java.util +import java.util.{Collections, Optional} + +import io.delta.kernel.data.Row +import io.delta.kernel.internal.actions.{Format, Metadata, Protocol} +import io.delta.kernel.internal.checksum.CRCInfo.CRC_FILE_SCHEMA +import io.delta.kernel.internal.data.GenericRow +import io.delta.kernel.internal.fs.Path +import io.delta.kernel.internal.util.VectorUtils +import io.delta.kernel.internal.util.VectorUtils.{stringArrayValue, stringStringMapValue} +import io.delta.kernel.test.{BaseMockJsonHandler, MockEngineUtils} +import io.delta.kernel.types.StructType +import io.delta.kernel.utils.CloseableIterator + +import org.scalatest.funsuite.AnyFunSuite + +/** + * Test suite for ChecksumWriter functionality. + */ +class ChecksumWriterSuite extends AnyFunSuite with MockEngineUtils { + + private val FAKE_DELTA_LOG_PATH = new Path("/path/to/delta/log") + + // Schema field indices in crc file + private val TABLE_SIZE_BYTES_IDX = CRC_FILE_SCHEMA.indexOf("tableSizeBytes") + private val NUM_FILES_IDX = CRC_FILE_SCHEMA.indexOf("numFiles") + private val NUM_METADATA_IDX = CRC_FILE_SCHEMA.indexOf("numMetadata") + private val NUM_PROTOCOL_IDX = CRC_FILE_SCHEMA.indexOf("numProtocol") + private val TXN_ID_IDX = CRC_FILE_SCHEMA.indexOf("txnId") + private val METADATA_IDX = CRC_FILE_SCHEMA.indexOf("metadata") + private val PROTOCOL_IDX = CRC_FILE_SCHEMA.indexOf("protocol") + + test("write checksum") { + val jsonHandler = new MockCheckSumFileJsonWriter() + val checksumWriter = new ChecksumWriter(FAKE_DELTA_LOG_PATH) + val protocol = createTestProtocol() + val metadata = createTestMetadata() + + def testChecksumWrite(txn: Optional[String]): Unit = { + val version = 1L + val tableSizeBytes = 100L + val numFiles = 1L + + checksumWriter.writeCheckSum( + mockEngine(jsonHandler = jsonHandler), + new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txn)) + + verifyChecksumFile(jsonHandler, version) + verifyChecksumContent(jsonHandler.capturedCrcRow.get, tableSizeBytes, numFiles, txn) + verifyMetadataAndProtocol(jsonHandler.capturedCrcRow.get, metadata, protocol) + } + + // Test with and without transaction ID + testChecksumWrite(Optional.of("txn")) + testChecksumWrite(Optional.empty()) + } + + private def verifyChecksumFile(jsonHandler: MockCheckSumFileJsonWriter, version: Long): Unit = { + assert(jsonHandler.checksumFilePath == s"$FAKE_DELTA_LOG_PATH/${"%020d".format(version)}.crc") + assert(jsonHandler.capturedCrcRow.isDefined) + assert(jsonHandler.capturedCrcRow.get.getSchema == CRC_FILE_SCHEMA) + } + + private def verifyChecksumContent( + actualCheckSumRow: Row, + expectedTableSizeBytes: Long, + expectedNumFiles: Long, + expectedTxnId: Optional[String]): Unit = { + assert(!actualCheckSumRow.isNullAt(TABLE_SIZE_BYTES_IDX) && actualCheckSumRow.getLong( + TABLE_SIZE_BYTES_IDX) == expectedTableSizeBytes) + assert(!actualCheckSumRow.isNullAt( + NUM_FILES_IDX) && actualCheckSumRow.getLong(NUM_FILES_IDX) == expectedNumFiles) + assert(!actualCheckSumRow.isNullAt( + NUM_METADATA_IDX) && actualCheckSumRow.getLong(NUM_METADATA_IDX) == 1L) + assert(!actualCheckSumRow.isNullAt( + NUM_PROTOCOL_IDX) && actualCheckSumRow.getLong(NUM_PROTOCOL_IDX) == 1L) + + if (expectedTxnId.isPresent) { + assert(actualCheckSumRow.getString(TXN_ID_IDX) == expectedTxnId.get()) + } else { + assert(actualCheckSumRow.isNullAt(TXN_ID_IDX)) + } + } + + private def verifyMetadataAndProtocol( + actualRow: Row, + expectedMetadata: Metadata, + expectedProtocol: Protocol): Unit = { + checkMetadata(expectedMetadata, actualRow.getStruct(METADATA_IDX)) + checkProtocol(expectedProtocol, actualRow.getStruct(PROTOCOL_IDX)) + } + + // TODO: implement compare in Metadata and remove this method + private def checkMetadata(expectedMetadata: Metadata, actualMetadataRow: Row): Unit = { + assert(actualMetadataRow.getSchema == Metadata.FULL_SCHEMA) + + def getOptionalString(field: String): Optional[String] = + Optional.ofNullable(actualMetadataRow.getString(Metadata.FULL_SCHEMA.indexOf(field))) + + assert( + actualMetadataRow.getString(Metadata.FULL_SCHEMA.indexOf("id")) == expectedMetadata.getId) + assert(getOptionalString("name") == expectedMetadata.getName) + assert(getOptionalString("description") == expectedMetadata.getDescription) + + val formatRow = actualMetadataRow.getStruct(Metadata.FULL_SCHEMA.indexOf("format")) + assert( + formatRow + .getString( + Format.FULL_SCHEMA.indexOf("provider")) == expectedMetadata.getFormat.getProvider) + + assert( + actualMetadataRow + .getString( + Metadata.FULL_SCHEMA.indexOf("schemaString")) == expectedMetadata.getSchemaString) + assert( + actualMetadataRow + .getArray(Metadata.FULL_SCHEMA.indexOf("partitionColumns")) + == expectedMetadata.getPartitionColumns) + assert( + Optional + .ofNullable(actualMetadataRow.getLong(Metadata.FULL_SCHEMA.indexOf("createdTime"))) + == expectedMetadata.getCreatedTime) + assert( + VectorUtils + .toJavaMap(actualMetadataRow.getMap(Metadata.FULL_SCHEMA.indexOf("configuration"))) + == expectedMetadata.getConfiguration) + } + + // TODO: implement compare in Protocol and remove this method + private def checkProtocol(expectedProtocol: Protocol, actualProtocolRow: Row): Unit = { + assert(actualProtocolRow.getSchema == Protocol.FULL_SCHEMA) + assert( + expectedProtocol.getMinReaderVersion == actualProtocolRow + .getInt(Protocol.FULL_SCHEMA.indexOf("minReaderVersion"))) + assert( + expectedProtocol.getMinWriterVersion == actualProtocolRow + .getInt(Protocol.FULL_SCHEMA.indexOf("minWriterVersion"))) + } + + private def createTestMetadata(): Metadata = { + new Metadata( + "id", + Optional.of("name"), + Optional.of("description"), + new Format("parquet", Collections.emptyMap()), + "schemaString", + new StructType(), + stringArrayValue(util.Arrays.asList("c3")), + Optional.of(123), + stringStringMapValue(new util.HashMap[String, String]() { + put("delta.appendOnly", "true") + })) + } + + private def createTestProtocol(): Protocol = { + new Protocol( + /* minReaderVersion= */ 1, + /* minWriterVersion= */ 2, + Collections.emptySet(), + Collections.emptySet()) + } +} + +/** + * Mock implementation of JsonHandler for testing checksum file writing. + */ +class MockCheckSumFileJsonWriter extends BaseMockJsonHandler { + var capturedCrcRow: Option[Row] = None + var checksumFilePath: String = "" + + override def writeJsonFileAtomically( + filePath: String, + data: CloseableIterator[Row], + overwrite: Boolean): Unit = { + checksumFilePath = filePath + assert(data.hasNext, "Expected data iterator to contain exactly one row") + capturedCrcRow = Some(data.next()) + assert(!data.hasNext, "Expected data iterator to contain exactly one row") + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ChecksumSimpleComparisonSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ChecksumSimpleComparisonSuite.scala new file mode 100644 index 00000000000..754a1572b00 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ChecksumSimpleComparisonSuite.scala @@ -0,0 +1,173 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults + +import java.io.File +import java.nio.file.Files +import java.util + +import scala.collection.immutable.Seq +import scala.jdk.CollectionConverters.setAsJavaSetConverter + +import io.delta.kernel.{Operation, Table} +import io.delta.kernel.data.Row +import io.delta.kernel.defaults.utils.TestUtils +import io.delta.kernel.engine.Engine +import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction +import io.delta.kernel.internal.TableImpl +import io.delta.kernel.internal.actions.{AddFile, SingleAction} +import io.delta.kernel.internal.checksum.{ChecksumReader, CRCInfo} +import io.delta.kernel.internal.fs.Path +import io.delta.kernel.internal.util.FileNames +import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.types.LongType.LONG +import io.delta.kernel.types.StructType +import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} + +import org.apache.spark.sql.functions.col + +/** + * Test suite to verify checksum file correctness by comparing + * Delta Spark and Delta Kernel generated checksum files. + * This suite ensures that both implementations generate consistent checksums + * for various table operations. + */ +class ChecksumSimpleComparisonSuite extends DeltaTableWriteSuiteBase with TestUtils { + + private val PARTITION_COLUMN = "part" + + test("create table, insert data and verify checksum") { + withTempDirAndEngine { (tablePath, engine) => + val sparkTablePath = tablePath + "spark" + val kernelTablePath = tablePath + "kernel" + + createTxn( + engine, + kernelTablePath, + isNewTable = true, + schema = new StructType().add("id", LONG), + partCols = Seq.empty).commit(engine, emptyIterable()) + .getPostCommitHooks + .forEach(hook => hook.threadSafeInvoke(engine)) + spark.sql(s"CREATE OR REPLACE TABLE delta.`${sparkTablePath}` (id LONG) USING DELTA") + assertChecksumEquals(engine, sparkTablePath, kernelTablePath, 0) + + (1 to 10).foreach { version => + spark.range(0, version).write.format("delta").mode("append").save(sparkTablePath) + commitSparkChangeToKernel(kernelTablePath, engine, sparkTablePath, version) + assertChecksumEquals(engine, sparkTablePath, kernelTablePath, version) + } + } + } + + test("create partitioned table, insert and verify checksum") { + withTempDirAndEngine { (tablePath, engine) => + val sparkTablePath = tablePath + "spark" + val kernelTablePath = tablePath + "kernel" + + createTxn( + engine, + kernelTablePath, + isNewTable = true, + schema = new StructType().add("id", LONG).add(PARTITION_COLUMN, LONG), + partCols = Seq(PARTITION_COLUMN)).commit(engine, emptyIterable()) + .getPostCommitHooks + .forEach(hook => hook.threadSafeInvoke(engine)) + spark.sql( + s"CREATE OR REPLACE TABLE delta.`${sparkTablePath}` " + + s"(id LONG, part LONG) USING DELTA PARTITIONED BY (part)") + assertChecksumEquals(engine, sparkTablePath, kernelTablePath, 0) + + (1 to 10).foreach { version => + spark.range(0, version).withColumn(PARTITION_COLUMN, col("id") % 2) + .write.format("delta").mode("append").save(sparkTablePath) + commitSparkChangeToKernel(kernelTablePath, engine, sparkTablePath, version) + assertChecksumEquals(engine, sparkTablePath, kernelTablePath, version) + } + } + } + + private def assertChecksumEquals( + engine: Engine, + sparkTablePath: String, + kernelTablePath: String, + version: Long): Unit = { + val sparkCrcPath = buildCrcPath(sparkTablePath, version) + val kernelCrcPath = buildCrcPath(kernelTablePath, version) + + assert( + Files.exists(sparkCrcPath) && Files.exists(kernelCrcPath), + s"CRC files not found for version $version") + + val sparkCrc = readCrcInfo(engine, sparkTablePath, version) + val kernelCrc = readCrcInfo(engine, kernelTablePath, version) + + assertCrcInfoEquals(sparkCrc, kernelCrc) + } + + private def readCrcInfo(engine: Engine, path: String, version: Long): CRCInfo = { + ChecksumReader + .getCRCInfo(engine, new Path(s"$path/_delta_log"), version, version) + .orElseThrow(() => new IllegalStateException(s"CRC info not found for version $version")) + } + + private def buildCrcPath(basePath: String, version: Long): java.nio.file.Path = { + new File(FileNames.checksumFile(new Path(f"$basePath/_delta_log"), version).toString).toPath + } + + // TODO: Add equals/hashCode to metadata, protocol then CRCInfo. + private def assertCrcInfoEquals(crc1: CRCInfo, crc2: CRCInfo): Unit = { + assert(crc1.getVersion === crc2.getVersion) + assert(crc1.getNumFiles === crc2.getNumFiles) + assert(crc1.getTableSizeBytes === crc2.getTableSizeBytes) + assert(crc1.getMetadata.getSchema === crc2.getMetadata.getSchema) + assert(crc1.getMetadata.getPartitionColNames === crc2.getMetadata.getPartitionColNames) + } + + // TODO docs + private def commitSparkChangeToKernel( + path: String, + engine: Engine, + sparkTablePath: String, + versionToConvert: Long): Unit = { + + val txn = Table.forPath(engine, path) + .createTransactionBuilder(engine, "test-engine", Operation.WRITE) + .build(engine) + + val tableChange = Table.forPath(engine, sparkTablePath).asInstanceOf[TableImpl].getChanges( + engine, + versionToConvert, + versionToConvert, + // TODO include REMOVE action as well once we support it + Set(DeltaAction.ADD).asJava) + + val addFilesRows = new util.ArrayList[Row]() + tableChange.forEach(batch => + batch.getRows.forEach(row => { + val addIndex = row.getSchema.indexOf("add") + if (!row.isNullAt(addIndex)) { + addFilesRows.add( + SingleAction.createAddFileSingleAction(new AddFile(row.getStruct(addIndex)).toRow)) + } + })) + + txn + .commit(engine, inMemoryIterable(toCloseableIterator(addFilesRows.iterator()))) + .getPostCommitHooks + .forEach(_.threadSafeInvoke(engine)) + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index b39c20d3c87..7b581beaeb1 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -43,9 +43,8 @@ import io.delta.kernel.internal.util.Utils.singletonCloseableIterator import io.delta.kernel.internal.util.Utils.toCloseableIterator import io.delta.kernel.types.IntegerType.INTEGER import io.delta.kernel.types.StructType +import io.delta.kernel.utils.{CloseableIterable, CloseableIterator, FileStatus} import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} -import io.delta.kernel.utils.CloseableIterator -import io.delta.kernel.utils.FileStatus import org.apache.spark.sql.delta.VersionNotFoundException @@ -320,7 +319,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { } val combineActions = inMemoryIterable(actions.reduceLeft(_ combine _)) - txn.commit(engine, combineActions) + commitTransaction(txn, engine, combineActions) } def appendData( @@ -381,7 +380,10 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { assertMetadataProp(snapshot, key, expectedValue) } - def verifyWrittenContent(path: String, expSchema: StructType, expData: Seq[TestRow]): Unit = { + protected def verifyWrittenContent( + path: String, + expSchema: StructType, + expData: Seq[TestRow]): Unit = { val actSchema = tableSchema(path) assert(actSchema === expSchema) @@ -460,4 +462,15 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { .stream() .anyMatch(hook => hook.getType == PostCommitHookType.CHECKPOINT) === isReadyForCheckpoint) } + + /** + * Commit transaction, all child suites should use this instead of txn.commit + * directly and could override it for specific test cases (e.g. commit and write CRC). + */ + protected def commitTransaction( + txn: Transaction, + engine: Engine, + dataActions: CloseableIterable[Row]): TransactionCommitResult = { + txn.commit(engine, dataActions) + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteWithCrcSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteWithCrcSuite.scala new file mode 100644 index 00000000000..53750cbdbbb --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteWithCrcSuite.scala @@ -0,0 +1,67 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults +import scala.collection.immutable.Seq +import scala.language.implicitConversions + +import io.delta.kernel.{Transaction, TransactionCommitResult} +import io.delta.kernel.data.Row +import io.delta.kernel.defaults.utils.TestRow +import io.delta.kernel.engine.Engine +import io.delta.kernel.hook.PostCommitHook.PostCommitHookType +import io.delta.kernel.internal.checksum.ChecksumReader +import io.delta.kernel.internal.fs.Path +import io.delta.kernel.types.StructType +import io.delta.kernel.utils.CloseableIterable + +/** + * Test suite that run all tests in DeltaTableWritesSuite with CRC file written + * after each delta commit. This test suite will verify that the written CRC files are valid. + */ +class DeltaTableWriteWithCrcSuite extends DeltaTableWritesSuite { + + override def commitTransaction( + txn: Transaction, + engine: Engine, + dataActions: CloseableIterable[Row]): TransactionCommitResult = { + val result = txn.commit(engine, dataActions) + result.getPostCommitHooks + .stream() + .filter(hook => hook.getType == PostCommitHookType.CHECKSUM_SIMPLE) + .forEach(hook => hook.threadSafeInvoke(engine)) + result + } + + override def verifyWrittenContent( + path: String, + expSchema: StructType, + expData: Seq[TestRow]): Unit = { + super.verifyWrittenContent(path, expSchema, expData) + verifyChecksumValid(path) + } + + /** Ensure checksum is readable by CRC reader. */ + def verifyChecksumValid( + tablePath: String): Unit = { + val checksumVersion = latestSnapshot(tablePath, defaultEngine).getVersion + val crcInfo = ChecksumReader.getCRCInfo( + defaultEngine, + new Path(f"$tablePath/_delta_log/"), + checksumVersion, + checksumVersion) + assert(crcInfo.isPresent) + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 57c00d0ab60..1620ffc3dec 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -48,6 +48,7 @@ import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} import io.delta.kernel.utils.CloseableIterable import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} +/** Transaction commit in this suite IS REQUIRED TO use commitTransaction than .commit */ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { /////////////////////////////////////////////////////////////////////////// @@ -99,7 +100,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE) val txn = txnBuilder.withSchema(engine, testSchema).build(engine) - txn.commit(engine, emptyIterable()) + commitTransaction(txn, engine, emptyIterable()) { val ex = intercept[TableAlreadyExistsException] { @@ -133,7 +134,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert(txn.getSchema(engine) === testSchema) assert(txn.getPartitionColumns(engine) === Seq.empty.asJava) - val txnResult = txn.commit(engine, emptyIterable()) + val txnResult = commitTransaction(txn, engine, emptyIterable()) assert(txnResult.getVersion === 0) assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) @@ -349,7 +350,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert(txn.getSchema(engine) === schema) // Expect the partition column name is exactly same as the one in the schema assert(txn.getPartitionColumns(engine) === Seq("Part1", "part2").asJava) - val txnResult = txn.commit(engine, emptyIterable()) + val txnResult = commitTransaction(txn, engine, emptyIterable()) assert(txnResult.getVersion === 0) assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) @@ -367,7 +368,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val table = Table.forPath(engine, tablePath) val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE) val txn = txnBuilder.withSchema(engine, schema).build(engine) - val txnResult = txn.commit(engine, emptyIterable()) + val txnResult = commitTransaction(txn, engine, emptyIterable()) assert(txnResult.getVersion === 0) assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) @@ -437,12 +438,12 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val stagedFiles = stageData(txnState, Map.empty, dataBatches1) val stagedActionsIterable = inMemoryIterable(stagedFiles) - val commitResult = txn.commit(engine, stagedActionsIterable) + val commitResult = commitTransaction(txn, engine, stagedActionsIterable) assert(commitResult.getVersion == 0) // try to commit the same transaction and expect failure val ex = intercept[IllegalStateException] { - txn.commit(engine, stagedActionsIterable) + commitTransaction(txn, engine, stagedActionsIterable) } assert(ex.getMessage.contains( "Transaction is already attempted to commit. Create a new transaction.")) @@ -564,7 +565,10 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa // partition cols are not written in the commit info for inserts val partitionBy = if (i == 0) expPartCols else null verifyCommitInfo(tblPath, version = i, partitionBy, operation = WRITE) - verifyWrittenContent(tblPath, schema, if (i == 0) expV0Data else expV0Data ++ expV1Data) + verifyWrittenContent( + tblPath, + schema, + if (i == 0) expV0Data else expV0Data ++ expV1Data) } } } @@ -798,6 +802,8 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa test("insert into table - idempotent writes") { withTempDirAndEngine { (tblPath, engine) => + // TODO: re-enable when CRC_FULL post commit hook is added, txn2 requires CRC_FULL + assume(this.suiteName != ("DeltaTableWriteWithCrcSuite")) val data = Seq(Map("part1" -> ofInt(1), "part2" -> ofInt(2)) -> dataPartitionBatches1) var expData = Seq.empty[TestRow] // as the data in inserted, update this. @@ -826,7 +832,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa txn: Transaction, actions: CloseableIterable[Row], expTblVer: Long): Unit = { - val commitResult = txn.commit(engine, actions) + val commitResult = commitTransaction(txn, engine, actions) expData = expData ++ data.flatMap(_._2).flatMap(_.toTestRows) @@ -895,6 +901,8 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa test("conflicts - creating new table - table created by other txn after current txn start") { withTempDirAndEngine { (tablePath, engine) => + // TODO: re-enable when CRC_FULL post commit hook is added + assume(this.suiteName != ("DeltaTableWriteWithCrcSuite")) val losingTx = createTestTxn(engine, tablePath, Some(testSchema)) // don't commit losingTxn, instead create a new txn and commit it @@ -943,6 +951,8 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa // Different scenarios that have multiple winning txns and with a checkpoint in between. Seq(1, 5, 12).foreach { numWinningTxs => test(s"conflicts - concurrent data append ($numWinningTxs) after the losing txn has started") { + // TODO: re-enable when CRC_FULL post commit hook is added + assume(this.suiteName != ("DeltaTableWriteWithCrcSuite")) withTempDirAndEngine { (tablePath, engine) => val testData = Seq(Map.empty[String, Literal] -> dataBatches1) var expData = Seq.empty[TestRow]