Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert #200 and new version #208

Merged
merged 2 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Dependencies._
import xerial.sbt.Sonatype._

val mainVersion = "0.3.3"
val mainVersion = "0.4.0"

lazy val qbeastCore = (project in file("core"))
.settings(
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/MetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ trait MetadataManager[DataSchema, FileDescriptor] {
*/
def updateTable(tableID: QTableID, tableChanges: TableChanges): Unit

/**
* This function checks if there's a conflict. A conflict happens if there
* are new cubes that have been optimized but they were not announced.
*
* @param tableID the table ID
* @param revisionID the revision ID
* @param knownAnnounced the cubes we know they were announced when the write operation started.
* @param oldReplicatedSet the old replicated set
* @return true if there is a conflict, false otherwise
*/
def hasConflicts(
tableID: QTableID,
revisionID: RevisionID,
knownAnnounced: Set[CubeId],
oldReplicatedSet: ReplicatedSet): Boolean

/**
* Checks if there's an existing log directory for the table
* @param tableID the table ID
Expand Down
26 changes: 13 additions & 13 deletions core/src/main/scala/io/qbeast/core/model/QbeastBlock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ package io.qbeast.core.model

/**
* Container class for Qbeast file's metadata
*
* @param path the file path
* @param cube the cube identifier
* @param revision the revision identifier
* @param minWeight the minimum weight of element
* @param maxWeight the maximum weight of element
* @param replicated the file is replicated
* @param elementCount the number of elements
* @param size the size in bytes
* @param modificationTime the modification timestamp
* @param path
* @param cube
* @param revision
* @param minWeight
* @param maxWeight
* @param state
* @param elementCount
* @param size
* @param modificationTime
*/

case class QbeastBlock(
path: String,
cube: String,
revision: Long,
minWeight: Weight,
maxWeight: Weight,
replicated: Boolean,
state: String,
elementCount: Long,
size: Long,
modificationTime: Long)
Expand All @@ -30,7 +30,7 @@ case class QbeastBlock(
object QbeastBlock {

private val metadataKeys =
Set("minWeight", "maxWeight", "replicated", "revision", "elementCount", "cube")
Set("minWeight", "maxWeight", "state", "revision", "elementCount", "cube")

private def checkBlockMetadata(blockMetadata: Map[String, String]): Unit = {
metadataKeys.foreach(key =>
Expand Down Expand Up @@ -60,7 +60,7 @@ object QbeastBlock {
blockMetadata("revision").toLong,
Weight(blockMetadata("minWeight").toInt),
Weight(blockMetadata("maxWeight").toInt),
blockMetadata("replicated").toBoolean,
blockMetadata("state"),
blockMetadata("elementCount").toLong,
size,
modificationTime)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package io.qbeast.core.model

import io.qbeast.core.transform.{
HashTransformation,
LinearTransformation,
Transformation,
Transformer
}
import io.qbeast.core.transform.{HashTransformation, LinearTransformation, Transformation, Transformer}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ class QbeastBlockTest extends AnyFlatSpec with Matchers {
"minWeight" -> "19217",
"cube" -> "",
"maxWeight" -> "11111111",
"replicated" -> "true",
"state" -> "FlOODED",
"revision" -> "1",
"elementCount" -> "777")

val qbeastBlock = QbeastBlock("path", blockMetadata, 0L, 0L)
qbeastBlock.cube shouldBe ""
qbeastBlock.minWeight shouldBe Weight(19217)
qbeastBlock.maxWeight shouldBe Weight(11111111)
qbeastBlock.replicated shouldBe true
qbeastBlock.state shouldBe "FlOODED"
qbeastBlock.revision shouldBe 1
qbeastBlock.elementCount shouldBe 777
}
Expand All @@ -32,7 +32,7 @@ class QbeastBlockTest extends AnyFlatSpec with Matchers {
"minWeight" -> "19217",
"cube" -> "",
"maxWeight" -> "11111111",
"replicated" -> "false",
"state" -> "FlOODED",
"revision" -> "bad_type",
"elementCount" -> "777")

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/CubeDataLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.qbeast.spark.delta

import io.qbeast.core.model.{CubeId, QTableID, Revision}
import io.qbeast.spark.utils.TagColumns
import io.qbeast.spark.utils.{State, TagColumns}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand Down Expand Up @@ -64,7 +64,7 @@ case class CubeDataLoader(tableID: QTableID) {
.where(
TagColumns.revision === lit(revision.revisionID.toString) &&
TagColumns.cube === lit(cube.string) &&
TagColumns.replicated === lit(false.toString()))
TagColumns.state != lit(State.ANNOUNCED))
.collect()

val fileNames = cubeBlocks.map(f => new Path(tableID.id, f.path).toString)
Expand Down
31 changes: 30 additions & 1 deletion src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
}
}

/**
* Constructs replicated set for each revision
*
* @return a map of revision identifier and replicated set
*/
private val replicatedSetsMap: Map[RevisionID, ReplicatedSet] = {
val listReplicatedSets = metadataMap.filterKeys(_.startsWith(MetadataConfig.replicatedSet))

listReplicatedSets.map { case (key: String, json: String) =>
val revisionID = key.split('.').last.toLong
val revision = getRevision(revisionID)
val replicatedSet = mapper
.readValue[Set[String]](json, classOf[Set[String]])
.map(revision.createCubeId)
(revisionID, replicatedSet)
}
}

/**
* Returns last available revision identifier
*
Expand All @@ -65,6 +83,16 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
throw AnalysisExceptionFactory.create(s"No space revision available with $revisionID"))
}

/**
* Returns the replicated set for a revision identifier if exists
* @param revisionID the revision identifier
* @return the replicated set
*/
private def getReplicatedSet(revisionID: RevisionID): ReplicatedSet = {
replicatedSetsMap
.getOrElse(revisionID, Set.empty)
}

/**
* Returns true if a revision with a specific revision identifier exists
*
Expand Down Expand Up @@ -92,7 +120,8 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
*/
override def loadIndexStatus(revisionID: RevisionID): IndexStatus = {
val revision = getRevision(revisionID)
new IndexStatusBuilder(this, revision).build()
val replicatedSet = getReplicatedSet(revisionID)
new IndexStatusBuilder(this, revision, replicatedSet).build()
}

/**
Expand Down
51 changes: 13 additions & 38 deletions src/main/scala/io/qbeast/spark/delta/IndexStatusBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@ package io.qbeast.spark.delta

import io.qbeast.core.model._
import io.qbeast.spark.delta.QbeastMetadataSQL._
import io.qbeast.spark.utils.State.FLOODED
import io.qbeast.spark.utils.TagColumns
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.min
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.{col, collect_list, lit, min, sum}
import org.apache.spark.sql.{Dataset, SparkSession}

import scala.collection.immutable.SortedMap

Expand All @@ -24,7 +20,11 @@ import scala.collection.immutable.SortedMap
* @param announcedSet the announced set available for the revision
* @param replicatedSet the replicated set available for the revision
*/
private[delta] class IndexStatusBuilder(qbeastSnapshot: DeltaQbeastSnapshot, revision: Revision)
private[delta] class IndexStatusBuilder(
qbeastSnapshot: DeltaQbeastSnapshot,
revision: Revision,
replicatedSet: ReplicatedSet,
announcedSet: Set[CubeId] = Set.empty)
extends Serializable
with StagingUtils {

Expand All @@ -37,19 +37,15 @@ private[delta] class IndexStatusBuilder(qbeastSnapshot: DeltaQbeastSnapshot, rev
qbeastSnapshot.loadRevisionBlocks(revision.revisionID)

def build(): IndexStatus = {
val cubeStatuses =
val cubeStatus =
if (isStaging(revision)) stagingCubeStatuses
else buildCubesStatuses

val (replicatedSet, announcedSet): (Set[CubeId], Set[CubeId]) =
if (isStaging(revision)) (Set.empty, Set.empty)
else buildReplicatedAndAnnouncedSets(cubeStatuses)

IndexStatus(
revision = revision,
replicatedSet = replicatedSet,
announcedSet = announcedSet,
cubesStatuses = cubeStatuses)
cubesStatuses = cubeStatus)
}

def stagingCubeStatuses: SortedMap[CubeId, CubeStatus] = {
Expand All @@ -64,7 +60,7 @@ private[delta] class IndexStatusBuilder(qbeastSnapshot: DeltaQbeastSnapshot, rev
revision.revisionID,
Weight.MinValue,
maxWeight,
false,
FLOODED,
0,
addFile.size,
addFile.modificationTime))
Expand Down Expand Up @@ -95,34 +91,13 @@ private[delta] class IndexStatusBuilder(qbeastSnapshot: DeltaQbeastSnapshot, rev
.select(
createCube(col("cube"), lit(ndims)).as("cubeId"),
col("maxWeight"),
normalizeWeight(col("maxWeight"), col("elementCount"), lit(rev.desiredCubeSize))
.as("normalizedWeight"),
normalizeWeight(col("maxWeight"), col("elementCount"), lit(rev.desiredCubeSize)).as(
"normalizedWeight"),
col("files"))
.as[CubeStatus]
.collect()
.foreach(row => builder += row.cubeId -> row)
builder.result()
}

def buildReplicatedAndAnnouncedSets(
cubeStatuses: Map[CubeId, CubeStatus]): (Set[CubeId], Set[CubeId]) = {
val replicatedSet = Set.newBuilder[CubeId]
val announcedSet = Set.newBuilder[CubeId]
cubeStatuses.foreach { case (id, status) =>
var hasReplicated = false
var hasUnreplicated = false
status.files.foreach(file =>
if (file.replicated) hasReplicated = true
else hasUnreplicated = true)
if (hasReplicated) {
if (hasUnreplicated) {
announcedSet += id
} else {
replicatedSet += id
}
}
}
(replicatedSet.result(), announcedSet.result())
}

}
37 changes: 35 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.qbeast.spark.delta

import io.qbeast.core.model.{Revision, StagingUtils, TableChanges, mapper}
import io.qbeast.core.model.{ReplicatedSet, Revision, StagingUtils, TableChanges, mapper}
import io.qbeast.spark.utils.MetadataConfig
import io.qbeast.spark.utils.MetadataConfig.{lastRevisionID, revision}
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -36,10 +36,38 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation w
}
}

/**
* Updates Delta Metadata Configuration with new replicated set
* for given revision
* @param baseConfiguration Delta Metadata Configuration
* @param revision the new revision to persist
* @param deltaReplicatedSet the new set of replicated cubes
*/

private def updateQbeastReplicatedSet(
baseConfiguration: Configuration,
revision: Revision,
deltaReplicatedSet: ReplicatedSet): Configuration = {

val revisionID = revision.revisionID
assert(baseConfiguration.contains(s"${MetadataConfig.revision}.$revisionID"))

val newReplicatedSet = deltaReplicatedSet.map(_.string)
// Save the replicated set of cube id's as String representation

baseConfiguration.updated(
s"${MetadataConfig.replicatedSet}.$revisionID",
mapper.writeValueAsString(newReplicatedSet))

}

private def overwriteQbeastConfiguration(baseConfiguration: Configuration): Configuration = {
val revisionKeys = baseConfiguration.keys.filter(_.startsWith(MetadataConfig.revision))
val replicatedSetKeys = {
baseConfiguration.keys.filter(_.startsWith(MetadataConfig.replicatedSet))
}
val other = baseConfiguration.keys.filter(_ == MetadataConfig.lastRevisionID)
val qbeastKeys = revisionKeys ++ other
val qbeastKeys = revisionKeys ++ replicatedSetKeys ++ other
baseConfiguration -- qbeastKeys
}

Expand Down Expand Up @@ -122,6 +150,11 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation w
val configuration =
if (isNewRevision || isOverwriteMode) {
updateQbeastRevision(baseConfiguration, latestRevision)
} else if (isOptimizeOperation) {
updateQbeastReplicatedSet(
baseConfiguration,
latestRevision,
tableChanges.announcedOrReplicatedSet)
} else baseConfiguration

if (txn.readVersion == -1) {
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/QbeastMetadataSQL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ object QbeastMetadataSQL {
col("size"),
col("modificationTime"),
weight(TagColumns.minWeight).as("minWeight"),
weight(TagColumns.maxWeight).as("maxWeight"),
TagColumns.replicated.cast("boolean").as("replicated"),
weight(TagColumns.maxWeight)
.as("maxWeight"),
TagColumns.state,
TagColumns.revision.cast("bigint").as("revision"),
TagColumns.elementCount.cast("bigint").as("elementCount"))

Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/io/qbeast/spark/delta/ReplicatedFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.qbeast.spark.delta

import io.qbeast.spark.utils.TagUtils
import io.qbeast.spark.utils.{State, TagUtils}
import org.apache.spark.sql.delta.actions.AddFile

/**
Expand All @@ -12,8 +12,7 @@ import org.apache.spark.sql.delta.actions.AddFile
object ReplicatedFile {

def apply(addFile: AddFile): AddFile = {
val newTags = addFile.tags
.updated(TagUtils.replicated, true.toString())
val newTags = addFile.tags.updated(TagUtils.state, State.REPLICATED)
addFile.copy(tags = newTags, modificationTime = System.currentTimeMillis())
}

Expand Down
Loading
Loading