diff --git a/README.md b/README.md index d86ea5e8a..5f25f57eb 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ [![Notebook](https://img.shields.io/badge/_-Jupyter_Notebook_example-0053B3?style=for-the-badge&logo=jupyter)](./docs/sample_pushdown_demo.ipynb)
[![Slack](https://img.shields.io/badge/_-Slack-blue?style=for-the-badge&logo=slack)](https://join.slack.com/t/qbeast-users/shared_invite/zt-w0zy8qrm-tJ2di1kZpXhjDq_hAl1LHw) -[![Medium](https://img.shields.io/badge/_-Medium-yellowgreen?style=for-the-badge&logo=medium)](https://blog.qbeast.io/) +[![Academy](https://img.shields.io/badge/_-Medium-yellowgreen?style=for-the-badge&logo=medium)](https://qbeast.io/academy-courses-index/) [![Website](https://img.shields.io/badge/_-Website-dc005f?style=for-the-badge&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAE0AAABNCAMAAADU1xmCAAAABGdBTUEAALGPC/xhBQAAACBjSFJNAAB6JgAAgIQAAPoAAACA6AAAdTAAAOpgAAA6mAAAF3CculE8AAAC6FBMVEVHcEyCvU6qx1Ckx1CatGSNwk+qxlDmlouPuJHMwE/bvn7evU/geImbyVDxrDn2ulzjQYTkQYXjQYTjQYTjQYTjQYTjQYTjQYTjQYTjQYTjQYTjQYTeR4ihx1D3tlb3sUf4qkH3slbysETmQofjQYTjQYT3qjv2rk35u2LvwWr5t1r41JjjQYTjQYT5yJL6yIH6zIn83rL705j94rzjQYTjQYTKqpj337LjQYT3qFHkRYf847/lRYf95sX5slWd4dxZxrlCtp58vp2Yv5Kkp5G+up0Wt6k4uqlSvKfQ272o0YWt4NTTwVTa0IfjQYT72aP61KX1g7P6tdH0ibfzcaj6p8jzXZ36ncP7rs37utRNSX6mdqY3NnaVjJU8OndBQHlaVoHAtaVUUYBhXoRZVoJpZYVzbouHgJCNxE+eyFCqx1CzxFC6w1DEwVDMwE/Wvk/evVDpu1DjQYT4r034s1D4t1n5vWf5w3X6xn76yoX7z5H705n71Z7616P826z837T84bn848D958j+6dBfv6pzwKeBxKaPxqebyamkyqerzKezzKe7z6lKvKnd59ygzYWDvk/97daj1cPJ3813uE/+8uF1z8OBu5BusGtUpExor07+8Nv+9elfq4U3lU6sx1XB04Td163w37Ts8++ZymLl27HE0qnP1KvT1Kv++fH++vn3xc3drtH////5q273nk72ttPUnMn3pE74u9XJfLjRrc77za/75/LDdbb4qlD7wNj2xd29crXY1dHy8ub2lUzyr8+1bLLzrqn3qpL3moH2jGT1f0zzeEj0hEr1iEr1jUyvZrDzcU3yZ0fuqc3spMronsf31+jDgbPil8TekcGnX63Pg7rajcDVib6gXKzdn7CXVamJTKWhdbDJvs6sop3QqZShmJmmnZo+PHhhXYS0bLOknq+XkJack5exp55taYqRiJO3rKCBe4+JgZFKSHxSUH+yqcB8do2/s6NzbYp4cotaV4JoZIVjODowAAAAaHRSTlMAZfugL62AAyHGFtEH4HxyJl+Nrcnb4dK8onJIEVzEl2RFL1Tq/v7dpYZUOJj0IOi8jHZfHGoM5vn0M8M8qexNquLlz6du7f3538KNrumB27p+3I1wqlyWtOBK07r+mOAu39af/PbmWr6y1IwAAAABYktHRK0gYsIdAAAACXBIWXMAABYlAAAWJQFJUiTwAAAAB3RJTUUH5gMVDhs12bWt8QAAAAFvck5UAc+id5oAAAcCSURBVFjDrdh5XBRVHADwVx5jOMpeLHslyuxGwC6iDgtuhJWInWtadtCFWSndrrcdCmgiECW4ooblkRarA4gCIZCkqOWVZiomooZRaQoe6b+992Zn9piZBXN/f/GB3S/v+L3fOwC4xbhjambmnb16gyBFn8zMN996uy8RHK0f1t65KyQ4XC+svds/OFxvVnuvPxkUbgCrvT8wOF11a9OC0te+nBb6P74skyuUqjC1Wh2u0er0BgDu5rRBgs9GDBw8JFKyySSlNTp8wqSS8do0wTzcA2P69BlRg+6NDo2J9WVlZotDEGrCo8X5a5FRWJs5c9asWbNnz5k7ND5+2PARsXEEoLUmh0moJXjGTagBa4SPNnfevA8+/OjjjxPjDOG8kBSu0ig1KjXEjSTow2tWgUaAwSNFtPmJIYYwJBm1lIxblKQtgQLWTE6LEh3ryCgRbcF9JB2WpE0WVhFei5CYuggRLet+QBtQ460h/dwRYiWBdQCvDZFKhZgUoZY9DFKjHngQxkMoRo8enZo6hq1ISOsPKJu4RsQNEmo5I0alpaWNfXjhokWfLM7NXZKXX/DIo1N5LZIIs0hxZLRAy855LG3s2E8XYm0x1AoKP/uc16IBBedIJsGBmKECbWmRR1tSXJi3bBmvPU4AFUpmg9TgxcULNOdyqJUszi3Kyy1ZsXLVKl6DGIkTUiNZ08lof+2L0uWwbau/RLHSS3siOhYAWo04nXTBGDLUT3OWLkQ9XY0wTvtqzdp1iYiToXVskRg6QNAgJN5fW8+OW0nJilVI+3rNmrUbNqyDHFylFO6rhGYwJgBiuJ8GOW5O8/MLNn7z7VqsrUsMhctUizi9uGZ3OOSAiB3nq5UV8lrBRo+2YFws+v9QCxfFrPBPJjmc2yd9NVexmJaVNRIurQTJxslRbdXCgjt+03wfbXORUHMy5RVRJCDRvCrFNJSNDhmwySdUblrgo20p8tOWMoyzoqIqlG1cEi3EaHaCCOqprZWVrhxvbVt1rreW7WIYpgZqKSQwoCyRCzU8BHIQR02AWi3j9Na+25zLaxvKGBTlUKsbzE6rSFeVqM1WkPz0dqjVM0xZjpfW0Mhp32OLKd2BNLgJ6lEGC5eXkc1EaiLSmtA3nF5aQyPWChl31GBteggwoK3IJjpsdjijE7BWi75S5qX9UJyXX+xi2+XMySpntRh28gQDp2czx/rMdqzV4++5Sp281tBYs3NXc/PuPXv3/PjTvh2sBvcGM/yeQnQSZMA6ntWa6mtxQ8p4bX/DgYMwDuz11uDA6cSmwY40AhiedWuHfj586AgEXby2/5cDAi2F7ZTKX1OgzROuvIke7fDRo79W1rp47dhxgTYDABv8Ypi/hrqvhtpzvtqJlpYtLZx28reDB/f6agSQsc3oqXZ8G6+d2unfNgIng0Wqp4G1U7sEmk1Ms7O/9B83f+1Us/+4JbOdEskQg8+cimmtJ5v95pQSK5i4xts8+SbVttbTuzmtDuebXWxvsLElhERrYesR6ba1tlbvZrWaeXgtoCJiFmymSfi3BPX89q0ul6R2uq3tzNl9UKvIZsrxOlWLbqoqNgvTX4DFjWlqqqwUaseq286c+/0M5PaVO5kyXENQujmExzyFe6G2n29DmuuIn/ZHdVvHn3+dQ9rfZ3fBNZeD6xsu5cJbkp4tSeDF9vYLhfUuptZH29LWcfHiS5z2TyMqcVV17oIkskGTFnyAB+ntMDrWMy6P1lLdcenyZW8NcXOqUth1JXoWwRs3hRvX3tl1hdNOVF+5evWSv3atkKmrcm8LJpE9Cyc1ykPUuM7OzgvHkbb5ytXr1/8V0a7loP2UhplgEj+JhLMpB15GWGfXjdOwVTdvXpDQrqG9PsBBBC8HI1xdryCuq+vGjfPnA2hj3N1RSRyS8G6vBSCjswfaJHjcDwtwRmL/FeprevfapBB3P5WSZ0tUMh3o3J7enTapn7vsWGhJzYqPsujcnt4VUEMYZWIvhdJhS8KXUMhlvBpAS4XnSiopcD+5QxxsHewsOfk1KW0Mqq7s5dUc+B5vZ6+kqAMZk0W1VNhL0sxdXnWBOffnlDT2XvfXkAVsnquwiQrMKdiPWXSozpDpk9/waKlT0H3ZYPa+oktd3bjQ4Q+bHGqdFV3AgCFjCooM9hGBViT5XvWNdGBOz71bWLR60utiB2G5Rvh0EG4IzNEazxOBRiFPltEG2qZPMIeLvEKgItLdcxylFn7J60et3edP5u5efEid0SERJqWNy6Qe5gl+99GIdUytkHlnUs/yBI+7Qa706bFFY+c3O0LpuIU84W+GyVSCHYZOrqd9RpvU3EqedD8UKp88ud2HPa+3JodJd7ua+2qPLGNyEJ4dZe400tAgGGHDL36KIL1Ng2SLw+JOtv8Aax/72g6rujwAAAAldEVYdGRhdGU6Y3JlYXRlADIwMjItMDMtMjFUMTQ6Mjc6NTArMDA6MDBUKA+xAAAAJXRFWHRkYXRlOm1vZGlmeQAyMDIyLTAzLTIxVDE0OjI3OjUwKzAwOjAwJXW3DQAAAABJRU5ErkJggg==)](https://qbeast.io) --- diff --git a/src/main/scala/io/qbeast/spark/index/query/QueryFiltersUtils.scala b/src/main/scala/io/qbeast/spark/index/query/QueryFiltersUtils.scala index e5226eb82..901c33acf 100644 --- a/src/main/scala/io/qbeast/spark/index/query/QueryFiltersUtils.scala +++ b/src/main/scala/io/qbeast/spark/index/query/QueryFiltersUtils.scala @@ -7,10 +7,15 @@ import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, getZoneId} import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.execution.InSubqueryExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DateType, TimestampType} import org.apache.spark.unsafe.types.UTF8String +import java.util.concurrent.TimeUnit + private[query] trait QueryFiltersUtils { lazy val spark: SparkSession = SparkSession.active @@ -91,15 +96,25 @@ private[query] trait QueryFiltersUtils { } /** - * Convert an Spark String type to a Scala core type - * @param value the value to convert + * Convert a Literal value from Spark to a Qbeast/Scala core type + * @param l the Literal to convert * @return */ - def sparkTypeToCoreType(value: Any): Any = { - value match { - case s: UTF8String => s.toString - case _ => value + def sparkTypeToCoreType(l: Literal): Any = { + + (l.value, l.dataType) match { + case (int: Integer, _: DateType) => + // convert DateType to Milliseconds + lazy val zoneId = getZoneId(SQLConf.get.sessionLocalTimeZone) + val dateInMicros = daysToMicros(int, zoneId) + val dateInMillis = TimeUnit.MICROSECONDS.toMillis(dateInMicros) + dateInMillis + case (long: Long, _: TimestampType) => + // convert Timestamp from Microseconds to Milliseconds + TimeUnit.MICROSECONDS.toMillis(long) + case (s: UTF8String, _) => s.toString + case _ => l.value } } diff --git a/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala b/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala index 941cdd2b1..30aa07932 100644 --- a/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala +++ b/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala @@ -88,23 +88,23 @@ private[spark] class QuerySpecBuilder(sparkFilters: Seq[Expression]) // if not found, use the overall coordinates val columnFrom = columnFilters .collectFirst { - case GreaterThan(_, Literal(value, _)) => sparkTypeToCoreType(value) - case GreaterThanOrEqual(_, Literal(value, _)) => sparkTypeToCoreType(value) - case LessThan(Literal(value, _), _) => sparkTypeToCoreType(value) - case LessThanOrEqual(Literal(value, _), _) => sparkTypeToCoreType(value) - case EqualTo(_, Literal(value, _)) => sparkTypeToCoreType(value) - case EqualTo(Literal(value, _), _) => sparkTypeToCoreType(value) + case GreaterThan(_, l: Literal) => sparkTypeToCoreType(l) + case GreaterThanOrEqual(_, l: Literal) => sparkTypeToCoreType(l) + case LessThan(l: Literal, _) => sparkTypeToCoreType(l) + case LessThanOrEqual(l: Literal, _) => sparkTypeToCoreType(l) + case EqualTo(_, l: Literal) => sparkTypeToCoreType(l) + case EqualTo(l: Literal, _) => sparkTypeToCoreType(l) case IsNull(_) => null } val columnTo = columnFilters .collectFirst { - case LessThan(_, Literal(value, _)) => sparkTypeToCoreType(value) - case LessThanOrEqual(_, Literal(value, _)) => sparkTypeToCoreType(value) - case GreaterThan(Literal(value, _), _) => sparkTypeToCoreType(value) - case GreaterThanOrEqual(Literal(value, _), _) => sparkTypeToCoreType(value) - case EqualTo(_, Literal(value, _)) => sparkTypeToCoreType(value) - case EqualTo(Literal(value, _), _) => sparkTypeToCoreType(value) + case LessThan(_, l: Literal) => sparkTypeToCoreType(l) + case LessThanOrEqual(_, l: Literal) => sparkTypeToCoreType(l) + case GreaterThan(l: Literal, _) => sparkTypeToCoreType(l) + case GreaterThanOrEqual(l: Literal, _) => sparkTypeToCoreType(l) + case EqualTo(_, l: Literal) => sparkTypeToCoreType(l) + case EqualTo(l: Literal, _) => sparkTypeToCoreType(l) case IsNull(_) => null } diff --git a/src/test/scala/io/qbeast/spark/index/query/TimeSeriesQueryTest.scala b/src/test/scala/io/qbeast/spark/index/query/TimeSeriesQueryTest.scala new file mode 100644 index 000000000..c35f0675c --- /dev/null +++ b/src/test/scala/io/qbeast/spark/index/query/TimeSeriesQueryTest.scala @@ -0,0 +1,95 @@ +package io.qbeast.spark.index.query + +import io.qbeast.spark.{QbeastIntegrationTestSpec} +import org.apache.spark.sql.functions.{to_date, to_timestamp, unix_timestamp} + +class TimeSeriesQueryTest extends QbeastIntegrationTestSpec with QueryTestSpec { + + "Qbeast Format" should "filter correctly values with Timestamp" in withSparkAndTmpDir( + (spark, tmpDir) => { + import spark.implicits._ + val df = + Seq( + "2017-01-03 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-01 12:02:00", + "2017-01-01 12:02:00") + .toDF("date") + .withColumn("date", to_timestamp($"date")) + + df.write.format("qbeast").option("columnsToIndex", "date").save(tmpDir) + + val indexed = spark.read.format("qbeast").load(tmpDir) + + indexed.filter("date == '2017-01-02 12:02:00'").count() shouldBe 3 + + indexed.filter("date > '2017-01-02 12:02:00'").count() shouldBe 1 + + indexed.filter("date < '2017-01-02 12:02:00'").count() shouldBe 2 + + }) + + it should "filter correctly values with Date" in withSparkAndTmpDir((spark, tmpDir) => { + import spark.implicits._ + val df = + Seq("2017-01-01", "2017-01-02", "2017-01-03", "2017-01-04") + .toDF("date") + .withColumn("date", to_date($"date")) + + df.write.format("qbeast").option("columnsToIndex", "date").save(tmpDir) + + val indexed = spark.read.format("qbeast").load(tmpDir) + + indexed.filter("date == '2017-01-03'").count() shouldBe 1 + + }) + + it should "filter correctly values with complex Dates" in withSparkAndTmpDir( + (spark, tmpDir) => { + import spark.implicits._ + val df = + Seq( + "2017-01-03 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-01 12:02:00", + "2017-01-01 12:02:00") + .toDF("date") + .withColumn("date", to_date($"date")) + + df.write.format("qbeast").option("columnsToIndex", "date").save(tmpDir) + + val indexed = spark.read.format("qbeast").load(tmpDir) + + indexed.filter("date == '2017-01-02 12:02:00'").count() shouldBe 3 + + }) + + it should "filter correctly values with Unix Timestamp" in withSparkAndTmpDir( + (spark, tmpDir) => { + import spark.implicits._ + val df = + Seq( + "2017-01-03 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-02 12:02:00", + "2017-01-01 12:02:00", + "2017-01-01 12:02:00") + .toDF("date") + .withColumn("date", unix_timestamp($"date")) + + df.write.format("qbeast").option("columnsToIndex", "date").save(tmpDir) + + val indexed = spark.read.format("qbeast").load(tmpDir) + val valueToFilter = df.first().getLong(0) + + indexed.filter(s"date == $valueToFilter").count() shouldBe df + .filter(s"date == $valueToFilter") + .count() + + }) +}