Skip to content

Commit

Permalink
#207 QbeastFilterPushdownTest is improved
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeiakimov committed Nov 24, 2023
1 parent 04f517e commit 2d39821
Showing 1 changed file with 10 additions and 15 deletions.
25 changes: 10 additions & 15 deletions src/test/scala/io/qbeast/spark/utils/QbeastFilterPushdownTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
private val filter_user_equal = "(user_id = 536764969)"
private val filter_product_equal = "(product_id = 11522682)"

private def checkFileFiltering(query: DataFrame): Unit = {
private def checkFiltersArePushedDown(query: DataFrame): Unit = {
val leaves =
query.queryExecution.executedPlan.collectLeaves().filter(_.isInstanceOf[FileSourceScanExec])

Expand All @@ -32,13 +32,8 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
leaves
.foreach {
case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] =>
val index = f.relation.location
val matchingFiles =
index.listFiles(f.partitionFilters, f.dataFilters).flatMap(_.files)
val allFiles = index.inputFiles
matchingFiles.length shouldBe <=(allFiles.length)
f.dataFilters.nonEmpty shouldBe true
}

}

private def checkLogicalFilterPushdown(sqlFilters: Seq[String], query: DataFrame): Unit = {
Expand Down Expand Up @@ -70,7 +65,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
val qbeastQuery = df.filter(filter)
val normalQuery = data.filter(filter)

checkFileFiltering(qbeastQuery)
checkFiltersArePushedDown(qbeastQuery)
qbeastQuery.count() shouldBe normalQuery.count()
assertLargeDatasetEquality(qbeastQuery, normalQuery, orderedComparison = false)

Expand All @@ -96,7 +91,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
val qbeastQuery = df.filter(filter)
val normalQuery = data.filter(filter)

checkFileFiltering(qbeastQuery)
checkFiltersArePushedDown(qbeastQuery)
qbeastQuery.count() shouldBe normalQuery.count()
assertLargeDatasetEquality(qbeastQuery, normalQuery, orderedComparison = false)

Expand All @@ -119,7 +114,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
val qbeastQuery = df.filter(filter)
val normalQuery = data.filter(filter)

checkFileFiltering(qbeastQuery)
checkFiltersArePushedDown(qbeastQuery)
qbeastQuery.count() shouldBe normalQuery.count()
assertLargeDatasetEquality(qbeastQuery, normalQuery, orderedComparison = false)

Expand All @@ -143,7 +138,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
val normalQuery = data.filter(filter)

// The file filtering should not be applied in this particular case
checkFileFiltering(qbeastQuery)
checkFiltersArePushedDown(qbeastQuery)
assertLargeDatasetEquality(qbeastQuery, normalQuery, orderedComparison = false)

}
Expand All @@ -167,7 +162,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
val qbeastQuery = df.filter(filter)
val normalQuery = dataWithNulls.filter(filter)

checkFileFiltering(qbeastQuery)
checkFiltersArePushedDown(qbeastQuery)
qbeastQuery.count() shouldBe normalQuery.count()
assertLargeDatasetEquality(qbeastQuery, normalQuery, orderedComparison = false)

Expand Down Expand Up @@ -236,7 +231,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {

// OR filters are not split, so we need to match them entirely
checkLogicalFilterPushdown(Seq(filter), query)
checkFileFiltering(query)
checkFiltersArePushedDown(query)
assertSmallDatasetEquality(query, originalQuery, orderedComparison = false)

}
Expand All @@ -259,7 +254,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {

// OR filters are not split, so we need to match them entirely
checkLogicalFilterPushdown(Seq(filter), query)
checkFileFiltering(query)
checkFiltersArePushedDown(query)
assertSmallDatasetEquality(query, originalQuery, orderedComparison = false)

}
Expand All @@ -281,7 +276,7 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {

// OR filters are not split, so we need to match them entirely
checkLogicalFilterPushdown(Seq(filter), query)
checkFileFiltering(query)
checkFiltersArePushedDown(query)
assertSmallDatasetEquality(query, originalQuery, orderedComparison = false)

}
Expand Down

0 comments on commit 2d39821

Please sign in to comment.