Skip to content

Commit

Permalink
#207 RollupWriteStrategy initial implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeiakimov committed Sep 12, 2023
1 parent cba6c82 commit a1215bc
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ trait TableChanges {
val updatedRevision: Revision
val deltaReplicatedSet: Set[CubeId]
val announcedOrReplicatedSet: Set[CubeId]
def cubeDomains: Map[CubeId, Double]
def cubeState(cubeId: CubeId): Option[String]
def cubeWeight(cubeId: CubeId): Option[Weight]
def cubeDomain(cubeId: CubeId): Option[Double]
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object BroadcastedTableChanges {
announcedOrReplicatedSet = announcedSet ++ replicatedSet,
cubeStates = SparkSession.active.sparkContext.broadcast(cubeStates.toMap),
cubeWeights = SparkSession.active.sparkContext.broadcast(cubeWeights),
cubeDomains = SparkSession.active.sparkContext.broadcast(deltaCubeDomains))
cubeDomainsBroadcast = SparkSession.active.sparkContext.broadcast(deltaCubeDomains))
}

}
Expand All @@ -74,12 +74,12 @@ case class BroadcastedTableChanges(
announcedOrReplicatedSet: Set[CubeId],
cubeStates: Broadcast[Map[CubeId, String]],
cubeWeights: Broadcast[Map[CubeId, Weight]],
cubeDomains: Broadcast[Map[CubeId, Double]])
cubeDomainsBroadcast: Broadcast[Map[CubeId, Double]])
extends TableChanges {

override def cubeWeight(cubeId: CubeId): Option[Weight] = cubeWeights.value.get(cubeId)

override def cubeState(cubeId: CubeId): Option[String] = cubeStates.value.get(cubeId)

override def cubeDomain(cubeId: CubeId): Option[Double] = cubeDomains.value.get(cubeId)
override def cubeDomains: Map[CubeId, Double] = cubeDomainsBroadcast.value
}
129 changes: 129 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/writer/RollupWriteStrategy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.spark.delta.writer

import io.qbeast.core.model.IndexFile
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import io.qbeast.IISeq
import io.qbeast.spark.index.QbeastColumns
import org.apache.spark.sql.catalyst.InternalRow
import scala.collection.mutable
import io.qbeast.core.model.CubeId
import io.qbeast.core.model.TableChanges
import io.qbeast.core.model.Weight

/**
* Implementation of WriteStrategy that groups the records to write by "rolling"
* them up along the index tree.
*
* @param qbeastColumns the Qbeast-specisific columns
* @param tableChanges the table changes
*/
private[writer] class RollupWriteStrategy(
qbeastColumns: QbeastColumns,
tableChanges: TableChanges)
extends WriteStrategy
with Serializable {

override def write(
data: DataFrame,
writerFactory: IndexFileWriterFactory): IISeq[(IndexFile, TaskStats)] = {
data
.repartition(col(QbeastColumns.cubeColumnName))
.queryExecution
.executedPlan
.execute()
.mapPartitions(writeRows(writerFactory, targetCubeIds))
.collect()
.toIndexedSeq
}

private def writeRows(
writerFactory: IndexFileWriterFactory,
targetCubeIds: Map[CubeId, CubeId])(
rows: Iterator[InternalRow]): Iterator[(IndexFile, TaskStats)] = {
val writers: mutable.Map[CubeId, IndexFileWriter] = mutable.Map.empty
rows.foreach { row =>
val cubeId = getCubeId(row)
val targetCubeId = targetCubeIds.getOrElse(cubeId, cubeId)
val writer = writers.getOrElseUpdate(targetCubeId, writerFactory.newWriter())
writer.write(row)
}
writers.values.iterator.map(_.close())
}

private def getCubeId(row: InternalRow): CubeId = {
val bytes = row.getBinary(qbeastColumns.cubeColumnIndex)
tableChanges.updatedRevision.createCubeId(bytes)
}

private def targetCubeIds: Map[CubeId, CubeId] = {
val minRowsPerFile = tableChanges.updatedRevision.desiredCubeSize.toDouble
val queue = new mutable.PriorityQueue()(CubeIdOrdering)
val rollups = mutable.Map.empty[CubeId, Rollup]
tableChanges.cubeDomains.foreach { case (cubeId, domain) =>
queue += cubeId
val minWeight = getMinWeight(cubeId).fraction
val maxWeight = getMaxWeight(cubeId).fraction
val size = (maxWeight - minWeight) * domain
rollups += cubeId -> Rollup(cubeId, size)
}

while (queue.nonEmpty) {
val cubeId = queue.dequeue()
val rollup = rollups(cubeId)
if (rollup.size < minRowsPerFile) {
cubeId.parent match {
case Some(parentCubeId) =>
val parentRollup = rollups.get(parentCubeId) match {
case Some(value) => value
case None =>
queue += parentCubeId
val value = Rollup(parentCubeId, 0)
rollups += parentCubeId -> value
value
}
parentRollup.append(rollup)
rollups -= cubeId
case None => ()
}
}
}

rollups.flatMap { case (cubeId, rollup) =>
rollup.cubeIds.map((_, cubeId))
}.toMap
}

private def getMinWeight(cubeId: CubeId): Weight = {
cubeId.parent match {
case Some(parentCubeId) => getMaxWeight(parentCubeId)
case None => Weight.MinValue
}
}

private def getMaxWeight(cubeId: CubeId): Weight = {
tableChanges.cubeWeight(cubeId).getOrElse(Weight.MaxValue)
}

}

private object CubeIdOrdering extends Ordering[CubeId] {
// Cube identifiers are compared by depth in reversed order.
override def compare(x: CubeId, y: CubeId): Int = y.depth - x.depth
}

private class Rollup(var cubeIds: Seq[CubeId], var size: Double) {

def append(other: Rollup): Unit = {
cubeIds ++= other.cubeIds
size += other.size
}

}

private object Rollup {
def apply(cubeId: CubeId, size: Double): Rollup = new Rollup(Seq(cubeId), size)
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ object SparkDeltaDataWriter
statsTrackers = statsTrackers ++ fileStatsTrackers,
configuration = serConf)

val strategy = new LegacyWriteStrategy(tableChanges.updatedRevision, qbeastColumns)
// val strategy = new LegacyWriteStrategy(tableChanges.updatedRevision, qbeastColumns)
val strategy = new RollupWriteStrategy(qbeastColumns, tableChanges)

val indexFilesAndStats = strategy.write(qbeastData, writerFactory)
val fileActions = indexFilesAndStats.map(_._1).map(IndexFiles.toAddFile)
Expand Down

0 comments on commit a1215bc

Please sign in to comment.