Skip to content

Commit

Permalink
Eliminate deletes and updates tests
Browse files Browse the repository at this point in the history
  • Loading branch information
osopardo1 committed Oct 24, 2024
1 parent b026c8f commit dc52aea
Showing 1 changed file with 0 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.qbeast.spark.utils

import io.delta.tables.DeltaTable
import io.qbeast.core.model.IndexFile
import io.qbeast.core.model.QTableID
import io.qbeast.spark.delta.DeltaQbeastSnapshot
Expand Down Expand Up @@ -250,41 +249,6 @@ class QbeastOptimizeIntegrationTest extends QbeastIntegrationTestSpec {
checkLatestRevisionAfterOptimize(spark, qtableID)
}

it should "optimize a table with Deletes" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
// Index with Qbeast
spark
.range(50)
.write
.mode("append")
.format("qbeast")
.option("columnsToIndex", "id")
.save(tmpDir)

// Delete data with DeltaTable
val deltaTable = DeltaTable.forPath(spark, tmpDir)
deltaTable.delete("id > 1 and id < 5")

// Check that the number of unindexed files is not 0
val qtableID = QTableID(tmpDir)
val firstUnindexedFiles = getUnindexedFilesFromDelta(qtableID)
firstUnindexedFiles should not be empty

// Optimize the Table
val qt = QbeastTable.forPath(spark, tmpDir)
qt.optimize(0L)

// After optimization, all files from the Delete Operation should be indexed
val unindexedFiles = getUnindexedFilesFromDelta(qtableID)
unindexedFiles shouldBe empty
// Check latest revision
checkLatestRevisionAfterOptimize(spark, QTableID(tmpDir))

// Check that the table size is correct
val qbeastDF = spark.read.format("qbeast").load(tmpDir)
qbeastDF.count() shouldBe 47
}

it should "Optimize a fraction of the Staging Area" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
// Index with Qbeast
Expand Down Expand Up @@ -333,37 +297,4 @@ class QbeastOptimizeIntegrationTest extends QbeastIntegrationTestSpec {
unindexedFiles2 shouldBe empty
}

it should "optimize a table with updates" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
// Index with Qbeast
spark
.range(50)
.write
.mode("append")
.format("qbeast")
.option("columnsToIndex", "id")
.save(tmpDir)

// Update data with DeltaTable
val deltaTable = DeltaTable.forPath(spark, tmpDir)
deltaTable.update(col("id") === "4", Map("id" -> lit("5")));

// Check that the number of unindexed files is not 0
val qtableID = QTableID(tmpDir)
val firstUnindexedFiles = getUnindexedFilesFromDelta(qtableID)
firstUnindexedFiles.count() shouldBe 1

// Optimize the Table
val qt = QbeastTable.forPath(spark, tmpDir)
qt.optimize(0L)

// After optimization, all files from the Update Operation should be indexed
val unindexedFiles = getUnindexedFilesFromDelta(qtableID)
unindexedFiles shouldBe empty

// Check that the table size is correct
val qbeastDF = spark.read.format("qbeast").load(tmpDir)
qbeastDF.filter("id == 5").count() shouldBe 2
}

}

0 comments on commit dc52aea

Please sign in to comment.