diff --git a/core/src/main/scala/io/qbeast/core/model/CubeId.scala b/core/src/main/scala/io/qbeast/core/model/CubeId.scala index 45bb60f52..23f929e38 100644 --- a/core/src/main/scala/io/qbeast/core/model/CubeId.scala +++ b/core/src/main/scala/io/qbeast/core/model/CubeId.scala @@ -6,7 +6,6 @@ package io.qbeast.core.model import io.qbeast.core.model.CubeId.{ChildrenIterator, Codec} import java.nio.ByteBuffer -import java.util.Arrays import scala.collection.immutable.BitSet import scala.collection.mutable @@ -95,20 +94,6 @@ object CubeId { containers(point).drop(depth).next() } - private def trimBitMask(bitMask: Array[Long]): Array[Long] = { - var last = bitMask.length - 1 - while (last >= 0 && bitMask(last) == 0) { - last -= 1 - } - if (last < bitMask.length - 1) { - val trimmedBitMask = new Array[Long](last + 1) - Array.copy(bitMask, 0, trimmedBitMask, 0, trimmedBitMask.length) - trimmedBitMask - } else { - bitMask - } - } - private class ContainersIterator(point: Point, parent: Option[CubeId]) extends Iterator[CubeId] { @@ -270,14 +255,17 @@ case class CubeId(dimensionCount: Int, depth: Int, bitMask: Array[Long]) * is less than, equal to, or greater than the other CubeId. */ override def compare(that: CubeId): Int = { - val thisBitset = BitSet.fromBitMaskNoCopy(bitMask) - val thatBitset = BitSet.fromBitMaskNoCopy(that.bitMask) - val commonDepth = math.min(depth, that.depth) - for (depthOffset <- 0.until(commonDepth * dimensionCount)) { - val firstBit = thisBitset.contains(depthOffset) - val secondBit = thatBitset.contains(depthOffset) - if (firstBit != secondBit) { - if (firstBit) { + require( + that.dimensionCount == dimensionCount, + "The two cubes must have the same dimension count.") + val thisBits = BitSet.fromBitMaskNoCopy(bitMask) + val thatBits = BitSet.fromBitMaskNoCopy(that.bitMask) + val end = dimensionCount * math.min(depth, that.depth) + for (i <- (0 until end)) { + val thisBit = thisBits.contains(i) + val thatBit = thatBits.contains(i) + if (thisBit != thatBit) { + if (thisBit) { return 1 } else { return -1 @@ -302,14 +290,13 @@ case class CubeId(dimensionCount: Int, depth: Int, bitMask: Array[Long]) require( other.dimensionCount == dimensionCount, "The two cubes must have the same dimension count.") - if (depth > other.depth) { - false - } else { - val end = dimensionCount * depth - val ancestorBitMask = BitSet.fromBitMaskNoCopy(other.bitMask).until(end).toBitMask - Arrays.equals(CubeId.trimBitMask(bitMask), CubeId.trimBitMask(ancestorBitMask)) + return false } + val end = dimensionCount * depth + val bits = BitSet.fromBitMaskNoCopy(bitMask) + val otherBits = BitSet.fromBitMask(other.bitMask).until(end) + bits == otherBits } /** @@ -402,9 +389,8 @@ case class CubeId(dimensionCount: Int, depth: Int, bitMask: Array[Long]) override def equals(obj: Any): Boolean = obj match { case other: CubeId => - dimensionCount == other.dimensionCount && depth == other.depth && Arrays.equals( - CubeId.trimBitMask(bitMask), - CubeId.trimBitMask(other.bitMask)) + dimensionCount == other.dimensionCount && depth == other.depth && BitSet.fromBitMaskNoCopy( + bitMask) == BitSet.fromBitMaskNoCopy(other.bitMask) case _ => false } @@ -413,7 +399,7 @@ case class CubeId(dimensionCount: Int, depth: Int, bitMask: Array[Long]) var result = 1 result = prime * result + dimensionCount result = prime * result + depth - result = prime * result + Arrays.hashCode(CubeId.trimBitMask(bitMask)) + result = prime * result + BitSet.fromBitMaskNoCopy(bitMask).hashCode() result } diff --git a/core/src/main/scala/io/qbeast/core/transform/HashTransformation.scala b/core/src/main/scala/io/qbeast/core/transform/HashTransformation.scala index e8f98c1cb..d736e2246 100644 --- a/core/src/main/scala/io/qbeast/core/transform/HashTransformation.scala +++ b/core/src/main/scala/io/qbeast/core/transform/HashTransformation.scala @@ -30,4 +30,5 @@ case class HashTransformation(nullValue: Any = Random.nextInt()) extends Transfo override def isSupersededBy(newTransformation: Transformation): Boolean = false override def merge(other: Transformation): Transformation = this + } diff --git a/core/src/main/scala/io/qbeast/core/transform/HistogramTransformation.scala b/core/src/main/scala/io/qbeast/core/transform/HistogramTransformation.scala new file mode 100644 index 000000000..e40d85279 --- /dev/null +++ b/core/src/main/scala/io/qbeast/core/transform/HistogramTransformation.scala @@ -0,0 +1,29 @@ +package io.qbeast.core.transform + +import io.qbeast.core.model.QDataType + +trait HistogramTransformation extends Transformation { + + /** + * QDataType for the associated column. + */ + def dataType: QDataType + + /** + * Histogram of the associated column that reflects the distribution of the column values. + * @return + */ + def histogram: IndexedSeq[Any] + + /** + * Determines whether the associated histogram is the default one + * @return + */ + def isDefault: Boolean + + override def transform(value: Any): Double + + override def isSupersededBy(newTransformation: Transformation): Boolean + + override def merge(other: Transformation): Transformation +} diff --git a/core/src/main/scala/io/qbeast/core/transform/HistogramTransformer.scala b/core/src/main/scala/io/qbeast/core/transform/HistogramTransformer.scala new file mode 100644 index 000000000..f39e6ba83 --- /dev/null +++ b/core/src/main/scala/io/qbeast/core/transform/HistogramTransformer.scala @@ -0,0 +1,43 @@ +package io.qbeast.core.transform + +import io.qbeast.core.model.{QDataType, StringDataType} + +object HistogramTransformer extends TransformerType { + override def transformerSimpleName: String = "histogram" + + override def apply(columnName: String, dataType: QDataType): Transformer = dataType match { + case StringDataType => StringHistogramTransformer(columnName, dataType) + case dt => throw new Exception(s"DataType not supported for HistogramTransformers: $dt") + } + + // "a" to "z" + def defaultStringHistogram: IndexedSeq[String] = (97 to 122).map(_.toChar.toString) +} + +trait HistogramTransformer extends Transformer { + + override protected def transformerType: TransformerType = HistogramTransformer + + /** + * Returns the name of the column + * + * @return + */ + override def columnName: String + + /** + * Returns the stats + * + * @return + */ + override def stats: ColumnStats + + /** + * Returns the Transformation given a row representation of the values + * + * @param row the values + * @return the transformation + */ + override def makeTransformation(row: String => Any): Transformation + +} diff --git a/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala b/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala index abb01f92e..83fce6990 100644 --- a/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala +++ b/core/src/main/scala/io/qbeast/core/transform/LinearTransformer.scala @@ -49,7 +49,7 @@ case class LinearTransformer(columnName: String, dataType: QDataType) extends Tr } else if (minAux == maxAux) { // If both values are equal we return an IdentityTransformation IdentityToZeroTransformation(minAux) - } else { // otherwhise we pick the min and max + } else { // otherwise we pick the min and max val min = getValue(minAux) val max = getValue(maxAux) dataType match { diff --git a/core/src/main/scala/io/qbeast/core/transform/StringHistogramTransformation.scala b/core/src/main/scala/io/qbeast/core/transform/StringHistogramTransformation.scala new file mode 100644 index 000000000..47f496d56 --- /dev/null +++ b/core/src/main/scala/io/qbeast/core/transform/StringHistogramTransformation.scala @@ -0,0 +1,130 @@ +package io.qbeast.core.transform + +import com.fasterxml.jackson.core.{JsonFactory, JsonGenerator, JsonParser, TreeNode} +import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize} +import com.fasterxml.jackson.databind.deser.std.StdDeserializer +import com.fasterxml.jackson.databind.jsontype.TypeSerializer +import com.fasterxml.jackson.databind.node.ArrayNode +import com.fasterxml.jackson.databind.ser.std.StdSerializer +import com.fasterxml.jackson.databind.{DeserializationContext, SerializerProvider} +import io.qbeast.core.model.{QDataType, StringDataType} +import io.qbeast.core.transform.HistogramTransformer.defaultStringHistogram + +import scala.collection.Searching._ + +@JsonSerialize(using = classOf[StringHistogramTransformationSerializer]) +@JsonDeserialize(using = classOf[StringHistogramTransformationDeserializer]) +case class StringHistogramTransformation(histogram: IndexedSeq[String]) + extends HistogramTransformation { + require(histogram.length > 1, s"Histogram length has to be > 1: ${histogram.length}") + + override val dataType: QDataType = StringDataType + + override def isDefault: Boolean = histogram == defaultStringHistogram + + /** + * Converts a real number to a normalized value. + * + * @param value a real number to convert + * @return a real number between 0 and 1 + */ + override def transform(value: Any): Double = { + val v: String = value match { + case s: String => s + case null => "null" + case _ => value.toString + } + + histogram.search(v) match { + case Found(foundIndex) => foundIndex.toDouble / (histogram.length - 1) + case InsertionPoint(insertionPoint) => + if (insertionPoint == 0) 0d + else if (insertionPoint == histogram.length + 1) 1d + else (insertionPoint - 1).toDouble / (histogram.length - 1) + } + } + + /** + * This method should determine if the new data will cause the creation of a new revision. + * + * @param newTransformation the new transformation created with statistics over the new data + * @return true if the domain of the newTransformation is not fully contained in this one. + */ + override def isSupersededBy(newTransformation: Transformation): Boolean = + newTransformation match { + case nt @ StringHistogramTransformation(hist) => + if (isDefault) !nt.isDefault + else if (nt.isDefault) false + else !(histogram == hist) + case _ => false + } + + /** + * Merges two transformations. The domain of the resulting transformation is the union of this + * + * @param other Transformation + * @return a new Transformation that contains both this and other. + */ + override def merge(other: Transformation): Transformation = other match { + case _: StringHistogramTransformation => other + case _ => this + } + +} + +class StringHistogramTransformationSerializer + extends StdSerializer[StringHistogramTransformation](classOf[StringHistogramTransformation]) { + val jsonFactory = new JsonFactory() + + override def serializeWithType( + value: StringHistogramTransformation, + gen: JsonGenerator, + serializers: SerializerProvider, + typeSer: TypeSerializer): Unit = { + gen.writeStartObject() + typeSer.getPropertyName + gen.writeStringField(typeSer.getPropertyName, typeSer.getTypeIdResolver.idFromValue(value)) + + gen.writeFieldName("histogram") + gen.writeStartArray() + value.histogram.foreach(gen.writeString) + gen.writeEndArray() + + gen.writeEndObject() + } + + override def serialize( + value: StringHistogramTransformation, + gen: JsonGenerator, + provider: SerializerProvider): Unit = { + gen.writeStartObject() + + gen.writeFieldName("histogram") + gen.writeStartArray() + value.histogram.foreach(gen.writeString) + gen.writeEndArray() + + gen.writeEndObject() + } + +} + +class StringHistogramTransformationDeserializer + extends StdDeserializer[StringHistogramTransformation]( + classOf[StringHistogramTransformation]) { + + override def deserialize( + p: JsonParser, + ctxt: DeserializationContext): StringHistogramTransformation = { + val histogramBuilder = IndexedSeq.newBuilder[String] + + val tree: TreeNode = p.getCodec.readTree(p) + tree.get("histogram") match { + case an: ArrayNode => + (0 until an.size()).foreach(i => histogramBuilder += an.get(i).asText()) + } + + StringHistogramTransformation(histogramBuilder.result()) + } + +} diff --git a/core/src/main/scala/io/qbeast/core/transform/StringHistogramTransformer.scala b/core/src/main/scala/io/qbeast/core/transform/StringHistogramTransformer.scala new file mode 100644 index 000000000..11081d038 --- /dev/null +++ b/core/src/main/scala/io/qbeast/core/transform/StringHistogramTransformer.scala @@ -0,0 +1,37 @@ +package io.qbeast.core.transform + +import io.qbeast.core.model.QDataType +import io.qbeast.core.transform.HistogramTransformer.defaultStringHistogram + +case class StringHistogramTransformer(columnName: String, dataType: QDataType) + extends HistogramTransformer { + private val columnHistogram = s"${columnName}_histogram" + + /** + * Returns the stats + * + * @return + */ + override def stats: ColumnStats = { + val defaultHistString = defaultStringHistogram.mkString("Array('", "', '", "')") + ColumnStats( + statsNames = columnHistogram :: Nil, + statsSqlPredicates = s"$defaultHistString AS $columnHistogram" :: Nil) + } + + /** + * Returns the Transformation given a row representation of the values + * + * @param row the values + * @return the transformation + */ + override def makeTransformation(row: String => Any): Transformation = { + val hist = row(columnHistogram) match { + case h: Seq[_] => h.map(_.toString).toIndexedSeq + case _ => defaultStringHistogram + } + + StringHistogramTransformation(hist) + } + +} diff --git a/core/src/main/scala/io/qbeast/core/transform/Transformer.scala b/core/src/main/scala/io/qbeast/core/transform/Transformer.scala index cf5456417..07d20390a 100644 --- a/core/src/main/scala/io/qbeast/core/transform/Transformer.scala +++ b/core/src/main/scala/io/qbeast/core/transform/Transformer.scala @@ -14,7 +14,9 @@ import java.util.Locale object Transformer { private val transformersRegistry: Map[String, TransformerType] = - Seq(LinearTransformer, HashTransformer).map(a => (a.transformerSimpleName, a)).toMap + Seq(LinearTransformer, HashTransformer, HistogramTransformer) + .map(a => (a.transformerSimpleName, a)) + .toMap /** * Returns the transformer for the given column and type of transformer diff --git a/core/src/test/scala/io/qbeast/core/model/CubeIdTest.scala b/core/src/test/scala/io/qbeast/core/model/CubeIdTest.scala index d8fd4edc4..d4858377e 100644 --- a/core/src/test/scala/io/qbeast/core/model/CubeIdTest.scala +++ b/core/src/test/scala/io/qbeast/core/model/CubeIdTest.scala @@ -28,6 +28,10 @@ class CubeIdTest extends AnyFlatSpec with Matchers { val id7 = CubeId(2, "wQwwwQwwQwwQwwwwwQwQwwwQQwwwwQQwQwwwQwwQwwQwwwwwQwwQQQQQQQQQQQQQ") id6 == id7 shouldBe true + val id8 = + CubeId(1, 4, Array(9L)).parent.get.parent.get.parent.get + val id9 = CubeId(1, 1, Array(1L)) + id8 == id9 shouldBe true } it should "implement hashCode correctly" in { @@ -153,6 +157,13 @@ class CubeIdTest extends AnyFlatSpec with Matchers { id4.nextSibling shouldBe None } + it should "implement children iterator throwing NoSuchElementException after last child" in { + val children = CubeId.root(1).children + children.next() shouldBe CubeId.root(1).firstChild + children.next() shouldBe CubeId.root(1).firstChild.nextSibling.get + assertThrows[NoSuchElementException](children.next()) + } + it should "return a correct container with specified depth" in { val point = Point(0.66, 0.83) val id = CubeId.container(point, 2) diff --git a/core/src/test/scala/io/qbeast/core/model/JSONSerializationTests.scala b/core/src/test/scala/io/qbeast/core/model/JSONSerializationTests.scala index 2e6594d83..389a9e72d 100644 --- a/core/src/test/scala/io/qbeast/core/model/JSONSerializationTests.scala +++ b/core/src/test/scala/io/qbeast/core/model/JSONSerializationTests.scala @@ -1,6 +1,11 @@ package io.qbeast.core.model -import io.qbeast.core.transform.{HashTransformation, LinearTransformation, Transformation, Transformer} +import io.qbeast.core.transform.{ + HashTransformation, + LinearTransformation, + Transformation, + Transformer +} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers diff --git a/core/src/test/scala/io/qbeast/core/transform/EmptyTransformationTest.scala b/core/src/test/scala/io/qbeast/core/transform/EmptyTransformationTest.scala index fa172af3e..e7f61ffc6 100644 --- a/core/src/test/scala/io/qbeast/core/transform/EmptyTransformationTest.scala +++ b/core/src/test/scala/io/qbeast/core/transform/EmptyTransformationTest.scala @@ -1,6 +1,7 @@ package io.qbeast.core.transform import io.qbeast.core.model.DoubleDataType +import io.qbeast.core.transform.HistogramTransformer.defaultStringHistogram import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -19,17 +20,21 @@ class EmptyTransformationTest extends AnyFlatSpec with Matchers { val et = EmptyTransformation() val ht = HashTransformation() val lt = LinearTransformation(1d, 1.1, DoubleDataType) + val sht = StringHistogramTransformation(defaultStringHistogram) et.isSupersededBy(ht) shouldBe true et.isSupersededBy(lt) shouldBe true + et.isSupersededBy(sht) shouldBe true } it should "return the other Transformation when merging" in { val et = EmptyTransformation() val ht = HashTransformation() val lt = LinearTransformation(1d, 1.1, DoubleDataType) + val sht = StringHistogramTransformation(defaultStringHistogram) et.merge(ht) shouldBe ht et.merge(lt) shouldBe lt + et.merge(sht) shouldBe sht } } diff --git a/core/src/test/scala/io/qbeast/core/transform/StringHistogramTransformationTest.scala b/core/src/test/scala/io/qbeast/core/transform/StringHistogramTransformationTest.scala new file mode 100644 index 000000000..99d4c3f3d --- /dev/null +++ b/core/src/test/scala/io/qbeast/core/transform/StringHistogramTransformationTest.scala @@ -0,0 +1,77 @@ +package io.qbeast.core.transform + +import io.qbeast.core.transform.HistogramTransformer.defaultStringHistogram +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.util.Random + +class StringHistogramTransformationTest extends AnyFlatSpec with Matchers { + + "A StringHistogramTransformation" should "map values to [0d, 1d]" in { + val attempts = 10 + val sht = StringHistogramTransformation(defaultStringHistogram) + val minAsciiEnc = 32 // SPACE + val maxAsciiEnc = 126 // ~ + (1 to attempts).foreach { _ => + val inputStr = (1 to 5) + .map { _ => + val asciiEnc = minAsciiEnc + Random.nextInt(maxAsciiEnc - minAsciiEnc) + asciiEnc.toChar.toString + } + .mkString("") + val v = sht.transform(inputStr) + + v should be <= 1d + v should be >= 0d + } + } + + it should "properly handle null" in { + val sht = StringHistogramTransformation(defaultStringHistogram) + val v = sht.transform(null) + v should be <= 1d + v should be >= 0d + } + + it should "map existing values correctly" in { + val hist = Array(0.0, 0.25, 0.5, 0.75, 1.0).map(_.toString) + val sht = StringHistogramTransformation(hist) + hist.foreach(s => sht.transform(s) shouldBe s.toDouble) + } + + it should "map non-existing values correctly" in { + val hist = Array(0.0, 0.25, 0.5, 0.75, 1.0).map(_.toString) + val sht = StringHistogramTransformation(hist) + sht.transform((hist.head.toDouble - 1).toString) shouldBe 0d + sht.transform((hist.last.toDouble + 1).toString) shouldBe 1d + + hist.foreach { s => + val v = (s.toDouble + 0.1).toString + sht.transform(v) shouldBe s.toDouble + } + } + + it should "supersede correctly" in { + val defaultT = StringHistogramTransformation(defaultStringHistogram) + val customT_1 = + StringHistogramTransformation(Array("brand_A", "brand_B", "brand_C")) + val customT_2 = + StringHistogramTransformation(Array("brand_A", "brand_B", "brand_D")) + + defaultT.isSupersededBy(customT_1) shouldBe true + defaultT.isSupersededBy(defaultT) shouldBe false + + customT_1.isSupersededBy(defaultT) shouldBe false + customT_1.isSupersededBy(customT_1) shouldBe false + customT_1.isSupersededBy(customT_2) shouldBe true + } + + it should "have histograms with length > 1" in { + an[IllegalArgumentException] should be thrownBy + StringHistogramTransformation(Array.empty[String]) + + an[IllegalArgumentException] should be thrownBy + StringHistogramTransformation(Array("a")) + } +} diff --git a/core/src/test/scala/io/qbeast/core/transform/TransformerTest.scala b/core/src/test/scala/io/qbeast/core/transform/TransformerTest.scala index 2ac139524..465a6928d 100644 --- a/core/src/test/scala/io/qbeast/core/transform/TransformerTest.scala +++ b/core/src/test/scala/io/qbeast/core/transform/TransformerTest.scala @@ -23,6 +23,9 @@ class TransformerTest extends AnyFlatSpec with Matchers { Transformer(columnName, dataType).spec shouldBe "a:linear" Transformer("linear", columnName, dataType).spec shouldBe "a:linear" Transformer("hashing", columnName, dataType).spec shouldBe "a:hashing" + + an[Exception] should be thrownBy Transformer("histogram", columnName, dataType).spec + an[NoSuchElementException] should be thrownBy Transformer( "another", columnName, @@ -78,6 +81,22 @@ class TransformerTest extends AnyFlatSpec with Matchers { } + it should "makeTransformation with String histograms" in { + val columnName = "s" + val dataType = StringDataType + val transformer = Transformer("histogram", columnName, dataType) + transformer shouldBe a[StringHistogramTransformer] + + val hist = Seq("str_1", "str_2", "str_3", "str_4", "str_5", "str_6") + val transformation = Map(s"${columnName}_histogram" -> hist) + transformer.makeTransformation(transformation) match { + case _ @StringHistogramTransformation(histogram) => + histogram == hist shouldBe true + case _ => fail("should always be StringHistogramTransformation") + } + + } + it should "return new transformation on maybeUpdateTransformation" in { val columnName = "a" val dataType = IntegerDataType diff --git a/docs/AdvancedConfiguration.md b/docs/AdvancedConfiguration.md index 7bb3b8eee..17611d45f 100644 --- a/docs/AdvancedConfiguration.md +++ b/docs/AdvancedConfiguration.md @@ -115,6 +115,58 @@ val columnStats = |"date_max":"${formatter.format(maxTimestamp)}" }""".stripMargin ``` +## String indexing via Histograms + +The default **String** column transformation (`HashTransformation`) has limited range query supports since the lexicographic ordering of the String values are not preserved. + +This can be addressed by introducing a custom **String** histogram in the form of sorted `Seq[String]`, and can lead to several improvements including: +1. more efficient file-pruning because of its reduced file-level column min/max +2. support for range queries on String columns +3. improved overall query speed + +The following code snippet demonstrates the extraction of a **String** histogram from the source data: +```scala + import org.apache.spark.sql.delta.skipping.MultiDimClusteringFunctions + import org.apache.spark.sql.DataFrame + import org.apache.spark.sql.functions.{col, min} + + def getStringHistogramStr(columnName: String, numBins: Int, df: DataFrame): String = { + val binStarts = "__bin_starts" + val stringPartitionColumn = + MultiDimClusteringFunctions.range_partition_id(col(columnName), numBins) + df + .select(columnName) + .distinct() + .na.drop() + .groupBy(stringPartitionColumn) + .agg(min(columnName).alias(binStarts)) + .select(binStarts) + .orderBy(binStarts) + .collect() + .map { r => + val s = r.getAs[String](0) + s"'$s'" + } + .mkString("[", ",", "]") + } + +val brandStats = getStringHistogramStr("brand", 50, df) +val statsStr = s"""{"brand_histogram":$brandStats}""" + +(df + .write + .mode("overwrite") + .format("qbeast") + .option("columnsToIndex", "brand:histogram") + .option("columnStats", statsStr) + .save(targetPath)) +``` +This is only necessary for the first write, if not otherwise made explicit, all subsequent appends will reuse the same histogram. +Any new custom histogram provided during `appends` forces the creation of a new `Revision`. + +A default **String** histogram("a" t0 "z") will be used if the use of histogram is stated(`stringColName:string_hist`) with no histogram in `columnStats`. +The default histogram can not supersede an existing `StringHashTransformation`. + ## DefaultCubeSize If you don't specify the cubeSize at DataFrame level, the default value is used. This is set to 5M, so if you want to change it diff --git a/src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala b/src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala index c396c7943..180fcc49b 100644 --- a/src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala +++ b/src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala @@ -132,13 +132,27 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation w // Merged schema will contain additional columns at the end val isNewSchema: Boolean = txn.metadata.schema != mergedSchema // Either the data triggered a new revision or the user specified options to amplify the ranges - val isQbeastMetadata: Boolean = txn.metadata.configuration.contains(lastRevisionID) + val containsQbeastMetadata: Boolean = txn.metadata.configuration.contains(lastRevisionID) + + // TODO This whole class is starting to be messy, and contains a lot of IF then ELSE + // TODO We should refactor QbeastMetadataOperation to make it more readable and usable + // TODO Ideally, we should delegate this process to the underlying format + + // Append on an empty table + val isNewWriteAppend = (!isOverwriteMode && txn.readVersion == -1) + // If the table exists, but the user added a new revision, we need to create a new revision + val isUserUpdatedMetadata = + containsQbeastMetadata && + tableChanges.updatedRevision.revisionID == txn.metadata + .configuration(lastRevisionID) + .toInt + 1 + + // Whether: + // 1. Data Triggered a New Revision + // 2. User added a columnStats that triggered a new Revision + // 3. User made an APPEND on a NEW TABLE with columnStats that triggered a new Revision val isNewRevision: Boolean = - tableChanges.isNewRevision || - (isQbeastMetadata && - tableChanges.updatedRevision.revisionID == txn.metadata - .configuration(lastRevisionID) - .toInt + 1) + tableChanges.isNewRevision || isUserUpdatedMetadata || isNewWriteAppend val latestRevision = tableChanges.updatedRevision val baseConfiguration: Configuration = diff --git a/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala b/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala index e70ce9805..8c46ed124 100644 --- a/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala +++ b/src/main/scala/io/qbeast/spark/index/OTreeDataAnalyzer.scala @@ -9,6 +9,7 @@ import io.qbeast.core.transform.Transformer import io.qbeast.spark.index.QbeastColumns.{cubeToReplicateColumnName, weightColumnName} import io.qbeast.spark.internal.QbeastFunctions.qbeastHash import org.apache.spark.qbeast.config.CUBE_WEIGHTS_BUFFER_CAPACITY + import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} @@ -246,7 +247,7 @@ object DoublePassOTreeDataAnalyzer extends OTreeDataAnalyzer with Serializable { } val spaceChanges = - if (isReplication) None + if (isReplication) None // IF is replication else calculateRevisionChanges(dataFrameStats, indexStatus.revision) // The revision to use diff --git a/src/test/scala/io/qbeast/spark/index/SparkRevisionFactoryTest.scala b/src/test/scala/io/qbeast/spark/index/SparkRevisionFactoryTest.scala index 3e15ce437..3dbcbba95 100644 --- a/src/test/scala/io/qbeast/spark/index/SparkRevisionFactoryTest.scala +++ b/src/test/scala/io/qbeast/spark/index/SparkRevisionFactoryTest.scala @@ -153,6 +153,40 @@ class SparkRevisionFactoryTest extends QbeastIntegrationTestSpec { }) + it should "createNewRevision with columnStats " + + "even on APPEND mode" in withSparkAndTmpDir((spark, tmpDir) => { + import spark.implicits._ + val df = 0.to(10).map(i => T3(i, i * 2.0, s"$i", i * 1.2f)).toDF() + val columnStats = """{ "a_min": 0, "a_max": 20 }""" + + // On append mode, it already expects a RevisionChange, + // but in this case the change is defined by the user + // instead of triggered by the data + + // TODO: very special case + // TODO: a cleaner solution would be to change the API for IndexManager + // and allow to send a set of options + // with user-specific configurations to Index + + df.write + .mode("append") + .format("qbeast") + .option("columnsToIndex", "a") + .option("columnStats", columnStats) + .save(tmpDir) + + val qbeastSnapshot = + DeltaQbeastSnapshot(DeltaLog.forTable(spark, tmpDir).update()) + val latestRevision = qbeastSnapshot.loadLatestRevision + val transformation = latestRevision.transformations.head + + transformation should not be null + transformation shouldBe a[LinearTransformation] + transformation.asInstanceOf[LinearTransformation].minNumber shouldBe 0 + transformation.asInstanceOf[LinearTransformation].maxNumber shouldBe 20 + + }) + it should "createNewRevision with min max timestamp" in withSpark(spark => { import spark.implicits._ val data = Seq( diff --git a/src/test/scala/io/qbeast/spark/index/TransformerIndexingTest.scala b/src/test/scala/io/qbeast/spark/index/TransformerIndexingTest.scala index 17844eae2..b8e5e8a65 100644 --- a/src/test/scala/io/qbeast/spark/index/TransformerIndexingTest.scala +++ b/src/test/scala/io/qbeast/spark/index/TransformerIndexingTest.scala @@ -2,15 +2,86 @@ package io.qbeast.spark.index import io.qbeast.TestClasses._ import io.qbeast.spark.QbeastIntegrationTestSpec -import org.apache.spark.sql.functions.{to_date, to_timestamp} -import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.functions.{ + to_date, + to_timestamp, + col, + min, + substring, + abs, + sum, + ascii +} +import org.apache.spark.sql.delta.skipping.MultiDimClusteringFunctions +import org.apache.spark.sql.{Dataset, DataFrame, SparkSession} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.apache.spark.sql.delta.DeltaLog import scala.util.Random class TransformerIndexingTest extends AnyFlatSpec with Matchers with QbeastIntegrationTestSpec { + /** + * Compute String column histogram. + * @param columnName String column name + * @param numBins number of bins for the histogram + * @param df DataFrame + * @return Sorted String histogram as a String + */ + def getStringHistogramStr(columnName: String, numBins: Int, df: DataFrame): String = { + val binStarts = "__bin_starts" + val stringPartitionColumn = + MultiDimClusteringFunctions.range_partition_id(col(columnName), numBins) + + df + .select(columnName) + .distinct() + .groupBy(stringPartitionColumn) + .agg(min(columnName).alias(binStarts)) + .select(binStarts) + .orderBy(binStarts) + .collect() + .map(r => s"'${r.getAs[String](0)}'") + .mkString("[", ",", "]") + } + + /** + * Compute weighted encoding distance for files: + * (ascii(string_col_max.head) - ascii(string_col_min.head)) * numRecords + */ + def computeColumnEncodingDist( + spark: SparkSession, + tablePath: String, + columnName: String): Long = { + import spark.implicits._ + + val dl = DeltaLog.forTable(spark, tablePath) + val js = dl + .update() + .allFiles + .select("stats") + .collect() + .map(r => r.getAs[String](0)) + .mkString("[", ",", "]") + val stats = spark.read.json(Seq(js).toDS()) + + stats + .select( + col(s"maxValues.$columnName").alias("__max"), + col(s"minValues.$columnName").alias("__min"), + col("numRecords")) + .withColumn("__max_start", substring(col("__max"), 0, 1)) + .withColumn("__min_start", substring(col("__min"), 0, 1)) + .withColumn("__max_ascii", ascii(col("__max_start"))) + .withColumn("__min_ascii", ascii(col("__min_start"))) + .withColumn("dist", abs(col("__max_ascii") - col("__min_ascii")) * col("numRecords")) + .select("dist") + .agg(sum("dist")) + .first() + .getAs[Long](0) + } + // Write source data indexing all columns and read it back private def writeAndReadDF(source: Dataset[_], tmpDir: String, spark: SparkSession) = { source.write @@ -350,4 +421,35 @@ class TransformerIndexingTest extends AnyFlatSpec with Matchers with QbeastInteg }) }) + + it should "create better file-level min-max with a String histogram" in withSparkAndTmpDir( + (spark, tmpDir) => { + val histPath = tmpDir + "/string_hist/" + val hashPath = tmpDir + "/string_hash/" + val colName = "brand" + + val df = loadTestData(spark) + + val colHistStr = getStringHistogramStr(colName, 50, df) + val statsStr = s"""{"${colName}_histogram":$colHistStr}""" + + df.write + .mode("overwrite") + .format("qbeast") + .option("cubeSize", "30000") + .option("columnsToIndex", s"$colName:histogram") + .option("columnStats", statsStr) + .save(histPath) + val histDist = computeColumnEncodingDist(spark, histPath, colName) + + df.write + .mode("overwrite") + .format("qbeast") + .option("columnsToIndex", colName) + .option("cubeSize", "30000") + .save(hashPath) + val hashDist = computeColumnEncodingDist(spark, hashPath, colName) + + histDist should be < hashDist + }) }