Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into 207-rollup-compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeiakimov committed Sep 5, 2023
2 parents 069dc9f + f9c7ab0 commit 73c56ac
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
32 changes: 17 additions & 15 deletions src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package io.qbeast.spark.index.query

import io.qbeast.core.model._
import org.apache.spark.sql.catalyst.expressions.{
Attribute,
EqualTo,
Expression,
GreaterThan,
Expand Down Expand Up @@ -56,7 +57,8 @@ private[spark] class QuerySpecBuilder(sparkFilters: Seq[Expression])
conjunctiveSplit.flatMap(transformInExpressions)

// Filter those that involve any Qbeast Indexed Column
val queryFilters = transformedFilters.filter(hasQbeastColumnReference(_, indexedColumns))
val queryFilters = transformedFilters
.filter(hasQbeastColumnReference(_, indexedColumns))

QbeastFilters(weightFilters, queryFilters)
}
Expand Down Expand Up @@ -88,24 +90,24 @@ private[spark] class QuerySpecBuilder(sparkFilters: Seq[Expression])
// if not found, use the overall coordinates
val columnFrom = columnFilters
.collectFirst {
case GreaterThan(_, l: Literal) => sparkTypeToCoreType(l)
case GreaterThanOrEqual(_, l: Literal) => sparkTypeToCoreType(l)
case LessThan(l: Literal, _) => sparkTypeToCoreType(l)
case LessThanOrEqual(l: Literal, _) => sparkTypeToCoreType(l)
case EqualTo(_, l: Literal) => sparkTypeToCoreType(l)
case EqualTo(l: Literal, _) => sparkTypeToCoreType(l)
case IsNull(_) => null
case GreaterThan(_: Attribute, l: Literal) => sparkTypeToCoreType(l)
case GreaterThanOrEqual(_: Attribute, l: Literal) => sparkTypeToCoreType(l)
case LessThan(l: Literal, _: Attribute) => sparkTypeToCoreType(l)
case LessThanOrEqual(l: Literal, _: Attribute) => sparkTypeToCoreType(l)
case EqualTo(_: Attribute, l: Literal) => sparkTypeToCoreType(l)
case EqualTo(l: Literal, _: Attribute) => sparkTypeToCoreType(l)
case IsNull(_: Attribute) => null
}

val columnTo = columnFilters
.collectFirst {
case LessThan(_, l: Literal) => sparkTypeToCoreType(l)
case LessThanOrEqual(_, l: Literal) => sparkTypeToCoreType(l)
case GreaterThan(l: Literal, _) => sparkTypeToCoreType(l)
case GreaterThanOrEqual(l: Literal, _) => sparkTypeToCoreType(l)
case EqualTo(_, l: Literal) => sparkTypeToCoreType(l)
case EqualTo(l: Literal, _) => sparkTypeToCoreType(l)
case IsNull(_) => null
case LessThan(_: Attribute, l: Literal) => sparkTypeToCoreType(l)
case LessThanOrEqual(_: Attribute, l: Literal) => sparkTypeToCoreType(l)
case GreaterThan(l: Literal, _: Attribute) => sparkTypeToCoreType(l)
case GreaterThanOrEqual(l: Literal, _: Attribute) => sparkTypeToCoreType(l)
case EqualTo(_: Attribute, l: Literal) => sparkTypeToCoreType(l)
case EqualTo(l: Literal, _: Attribute) => sparkTypeToCoreType(l)
case IsNull(_: Attribute) => null
}

(columnFrom, columnTo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,14 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {
}
}

it should "pushdown regex expressions on strings" in withQbeastContextSparkAndTmpDir {
it should "NOT pushdown regex expressions on strings" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
{
val data = loadTestData(spark)
val columnName = "brand_mod"
val modifiedData =
data
.withColumn(columnName, regexp_replace(col("brand"), "versace", "prefix_versace"))
.withColumn(columnName, regexp_replace(col("brand"), "kipardo", "prefix_kipardo"))

// Index data with the new column
writeTestData(modifiedData, Seq(columnName), 10000, tmpDir)
Expand All @@ -308,14 +308,27 @@ class QbeastFilterPushdownTest extends QbeastIntegrationTestSpec {

val regexExpression = "prefix_(.+)"
val filter =
s"(regexp_extract($columnName, '$regexExpression', 1) = 'versace')"
s"(regexp_extract($columnName, '$regexExpression', 1) = 'kipardo')"
val query = df.filter(filter)
val originalQuery = modifiedData.filter(filter)
val originalQueryWithoutRegex = data.filter("brand = 'versace'")
val originalQueryWithoutRegex = data.filter("brand = 'kipardo'")

// OR filters are not split, so we need to match them entirely
checkLogicalFilterPushdown(Seq(filter), query)
checkFileFiltering(query)

// Check if the files returned from the query are equal as the files existing in the folder
query.queryExecution.executedPlan
.collectLeaves()
.filter(_.isInstanceOf[FileSourceScanExec])
.foreach {
case f: FileSourceScanExec if f.relation.location.isInstanceOf[OTreeIndex] =>
val index = f.relation.location
val matchingFiles =
index.listFiles(f.partitionFilters, f.dataFilters).flatMap(_.files)
val allFiles = index.inputFiles
matchingFiles.length shouldBe allFiles.length
}

assertSmallDatasetEquality(
query,
originalQuery,
Expand Down

0 comments on commit 73c56ac

Please sign in to comment.