diff --git a/README.md b/README.md index d86ea5e8a..5f25f57eb 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ [![Notebook](https://img.shields.io/badge/_-Jupyter_Notebook_example-0053B3?style=for-the-badge&logo=jupyter)](./docs/sample_pushdown_demo.ipynb)
[![Slack](https://img.shields.io/badge/_-Slack-blue?style=for-the-badge&logo=slack)](https://join.slack.com/t/qbeast-users/shared_invite/zt-w0zy8qrm-tJ2di1kZpXhjDq_hAl1LHw) -[![Medium](https://img.shields.io/badge/_-Medium-yellowgreen?style=for-the-badge&logo=medium)](https://blog.qbeast.io/) +[![Academy](https://img.shields.io/badge/_-Medium-yellowgreen?style=for-the-badge&logo=medium)](https://qbeast.io/academy-courses-index/) [![Website](https://img.shields.io/badge/_-Website-dc005f?style=for-the-badge&logo=)](https://qbeast.io) --- diff --git a/src/main/scala/io/qbeast/spark/index/query/QueryFiltersUtils.scala b/src/main/scala/io/qbeast/spark/index/query/QueryFiltersUtils.scala index e5226eb82..901c33acf 100644 --- a/src/main/scala/io/qbeast/spark/index/query/QueryFiltersUtils.scala +++ b/src/main/scala/io/qbeast/spark/index/query/QueryFiltersUtils.scala @@ -7,10 +7,15 @@ import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, getZoneId} import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.execution.InSubqueryExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DateType, TimestampType} import org.apache.spark.unsafe.types.UTF8String +import java.util.concurrent.TimeUnit + private[query] trait QueryFiltersUtils { lazy val spark: SparkSession = SparkSession.active @@ -91,15 +96,25 @@ private[query] trait QueryFiltersUtils { } /** - * Convert an Spark String type to a Scala core type - * @param value the value to convert + * Convert a Literal value from Spark to a Qbeast/Scala core type + * @param l the Literal to convert * @return */ - def sparkTypeToCoreType(value: Any): Any = { - value match { - case s: UTF8String => s.toString - case _ => value + def sparkTypeToCoreType(l: Literal): Any = { + + (l.value, l.dataType) match { + case (int: Integer, _: DateType) => + // convert DateType to Milliseconds + lazy val zoneId = getZoneId(SQLConf.get.sessionLocalTimeZone) + val dateInMicros = daysToMicros(int, zoneId) + val dateInMillis = TimeUnit.MICROSECONDS.toMillis(dateInMicros) + dateInMillis + case (long: Long, _: TimestampType) => + // convert Timestamp from Microseconds to Milliseconds + TimeUnit.MICROSECONDS.toMillis(long) + case (s: UTF8String, _) => s.toString + case _ => l.value } } diff --git a/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala b/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala index 941cdd2b1..30aa07932 100644 --- a/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala +++ b/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala @@ -88,23 +88,23 @@ private[spark] class QuerySpecBuilder(sparkFilters: Seq[Expression]) // if not found, use the overall coordinates val columnFrom = columnFilters .collectFirst { - case GreaterThan(_, Literal(value, _)) => sparkTypeToCoreType(value) - case GreaterThanOrEqual(_, Literal(value, _)) => sparkTypeToCoreType(value) - case LessThan(Literal(value, _), _) => sparkTypeToCoreType(value) - case LessThanOrEqual(Literal(value, _), _) => sparkTypeToCoreType(value) - case EqualTo(_, Literal(value, _)) => sparkTypeToCoreType(value) - case EqualTo(Literal(value, _), _) => sparkTypeToCoreType(value) + case GreaterThan(_, l: Literal) => sparkTypeToCoreType(l) + case GreaterThanOrEqual(_, l: Literal) => sparkTypeToCoreType(l) + case LessThan(l: Literal, _) => sparkTypeToCoreType(l) + case LessThanOrEqual(l: Literal, _) => sparkTypeToCoreType(l) + case EqualTo(_, l: Literal) => sparkTypeToCoreType(l) + case EqualTo(l: Literal, _) => sparkTypeToCoreType(l) case IsNull(_) => null } val columnTo = columnFilters .collectFirst { - case LessThan(_, Literal(value, _)) => sparkTypeToCoreType(value) - case LessThanOrEqual(_, Literal(value, _)) => sparkTypeToCoreType(value) - case GreaterThan(Literal(value, _), _) => sparkTypeToCoreType(value) - case GreaterThanOrEqual(Literal(value, _), _) => sparkTypeToCoreType(value) - case EqualTo(_, Literal(value, _)) => sparkTypeToCoreType(value) - case EqualTo(Literal(value, _), _) => sparkTypeToCoreType(value) + case LessThan(_, l: Literal) => sparkTypeToCoreType(l) + case LessThanOrEqual(_, l: Literal) => sparkTypeToCoreType(l) + case GreaterThan(l: Literal, _) => sparkTypeToCoreType(l) + case GreaterThanOrEqual(l: Literal, _) => sparkTypeToCoreType(l) + case EqualTo(_, l: Literal) => sparkTypeToCoreType(l) + case EqualTo(l: Literal, _) => sparkTypeToCoreType(l) case IsNull(_) => null } diff --git a/src/test/scala/io/qbeast/spark/index/query/TimeSeriesQueryTest.scala b/src/test/scala/io/qbeast/spark/index/query/TimeSeriesQueryTest.scala new file mode 100644 index 000000000..c35f0675c --- /dev/null +++ b/src/test/scala/io/qbeast/spark/index/query/TimeSeriesQueryTest.scala @@ -0,0 +1,95 @@ +package io.qbeast.spark.index.query + +import io.qbeast.spark.{QbeastIntegrationTestSpec} +import org.apache.spark.sql.functions.{to_date, to_timestamp, unix_timestamp} + +class TimeSeriesQueryTest extends QbeastIntegrationTestSpec with QueryTestSpec { + + "Qbeast Format" should "filter correctly values with Timestamp" in withSparkAndTmpDir( + (spark, tmpDir) => { + import spark.implicits._ + val df = + Seq( + "2017-01-03 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-01 12:02:00", + "2017-01-01 12:02:00") + .toDF("date") + .withColumn("date", to_timestamp($"date")) + + df.write.format("qbeast").option("columnsToIndex", "date").save(tmpDir) + + val indexed = spark.read.format("qbeast").load(tmpDir) + + indexed.filter("date == '2017-01-02 12:02:00'").count() shouldBe 3 + + indexed.filter("date > '2017-01-02 12:02:00'").count() shouldBe 1 + + indexed.filter("date < '2017-01-02 12:02:00'").count() shouldBe 2 + + }) + + it should "filter correctly values with Date" in withSparkAndTmpDir((spark, tmpDir) => { + import spark.implicits._ + val df = + Seq("2017-01-01", "2017-01-02", "2017-01-03", "2017-01-04") + .toDF("date") + .withColumn("date", to_date($"date")) + + df.write.format("qbeast").option("columnsToIndex", "date").save(tmpDir) + + val indexed = spark.read.format("qbeast").load(tmpDir) + + indexed.filter("date == '2017-01-03'").count() shouldBe 1 + + }) + + it should "filter correctly values with complex Dates" in withSparkAndTmpDir( + (spark, tmpDir) => { + import spark.implicits._ + val df = + Seq( + "2017-01-03 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-01 12:02:00", + "2017-01-01 12:02:00") + .toDF("date") + .withColumn("date", to_date($"date")) + + df.write.format("qbeast").option("columnsToIndex", "date").save(tmpDir) + + val indexed = spark.read.format("qbeast").load(tmpDir) + + indexed.filter("date == '2017-01-02 12:02:00'").count() shouldBe 3 + + }) + + it should "filter correctly values with Unix Timestamp" in withSparkAndTmpDir( + (spark, tmpDir) => { + import spark.implicits._ + val df = + Seq( + "2017-01-03 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-01 12:02:00", + "2017-01-01 12:02:00") + .toDF("date") + .withColumn("date", unix_timestamp($"date")) + + df.write.format("qbeast").option("columnsToIndex", "date").save(tmpDir) + + val indexed = spark.read.format("qbeast").load(tmpDir) + val valueToFilter = df.first().getLong(0) + + indexed.filter(s"date == $valueToFilter").count() shouldBe df + .filter(s"date == $valueToFilter") + .count() + + }) +}