Skip to content

Commit

Permalink
Use update() for retrieving Snapshot and add Timestamp_NTZ type
Browse files Browse the repository at this point in the history
  • Loading branch information
osopardo1 authored and osopardo1 committed Aug 30, 2023
1 parent 0f344fa commit a6cd970
Show file tree
Hide file tree
Showing 26 changed files with 161 additions and 129 deletions.
6 changes: 6 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/QDataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ object TimestampDataType extends OrderedDataType {

}

object TimestampNTZType extends OrderedDataType {
override def name: String = "TimestampNTZType"
override val ordering: Numeric[Any] = implicitly[Numeric[Long]].asInstanceOf[Numeric[Any]]

}

object DateDataType extends OrderedDataType {
override def name: String = "DateDataType"
override val ordering: Numeric[Any] = implicitly[Numeric[Long]].asInstanceOf[Numeric[Any]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ case class LinearTransformation(

/**
* Merges two transformations. The domain of the resulting transformation is the union of this
* and the other transformation. The range of the resulting transformation is the intersection of
* this and the other transformation, which can be a LinearTransformation or IdentityTransformation
* and the other transformation. The range of the resulting transformation
* is the intersection of this and the other transformation,
* which can be a LinearTransformation or IdentityTransformation
* @param other
* @return a new Transformation that contains both this and other.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/qbeast/spark/QbeastTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class QbeastTable private (
private def deltaLog: DeltaLog = DeltaLog.forTable(sparkSession, tableID.id)

private def qbeastSnapshot: DeltaQbeastSnapshot =
delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
delta.DeltaQbeastSnapshot(deltaLog.update())

private def indexedTable: IndexedTable = indexedTableFactory.getIndexedTable(tableID)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ private[delta] case class DeltaMetadataWriter(

val cubeStrings = deltaReplicatedSet.map(_.string)
val cubeBlocks =
deltaLog.unsafeVolatileSnapshot.allFiles
deltaLog
.update()
.allFiles
.where(TagColumns.revision === lit(revision.revisionID.toString) &&
TagColumns.cube.isInCollection(cubeStrings))
.collect()
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object OTreeIndex {

def apply(spark: SparkSession, path: Path): OTreeIndex = {
val deltaLog = DeltaLog.forTable(spark, path)
val unsafeVolatileSnapshot = deltaLog.unsafeVolatileSnapshot
val unsafeVolatileSnapshot = deltaLog.update()
val tahoe = TahoeLogFileIndex(spark, deltaLog, path, unsafeVolatileSnapshot, Seq.empty, false)
OTreeIndex(tahoe)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ case class ConvertToQbeastCommand(
val (fileFormat, tableId) = resolveTableFormat(spark)

val deltaLog = DeltaLog.forTable(spark, tableId.table)
val unsafeVolatileSnapshot = deltaLog.unsafeVolatileSnapshot
val unsafeVolatileSnapshot = deltaLog.update()
val qbeastSnapshot = DeltaQbeastSnapshot(unsafeVolatileSnapshot)
val isQbeast = qbeastSnapshot.loadAllRevisions.nonEmpty

Expand All @@ -79,7 +79,7 @@ case class ConvertToQbeastCommand(

// Convert delta to qbeast through metadata modification
val tableID = QTableID(tableId.table)
val schema = deltaLog.unsafeVolatileSnapshot.schema
val schema = deltaLog.update().schema

SparkDeltaMetadataManager.updateMetadataWithTransaction(tableID, schema) {
val convRevision = stagingRevision(tableID, cubeSize, columnsToIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class QbeastWriteBuilder(
// Passing the options in the query plan plus the properties
// because columnsToIndex needs to be included in the contract
val writeOptions = info.options().toMap ++ properties
// scalastyle:off
println("data schema " + data.schema)
indexedTable.save(data, writeOptions, append)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ object SparkToQTypesUtils {
case _: DecimalType => qmodel.DecimalDataType
case _: TimestampType => qmodel.TimestampDataType
case _: DateType => qmodel.DateDataType
case _: TimestampNTZType => qmodel.TimestampNTZType
case _ => throw new RuntimeException(s"${sparkType.typeName} is not supported yet")
// TODO add more types
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait DeltaStatsCollectionUtils {
val deltaLog = DeltaLog.forTable(sparkSession, tableID.id)
val metadata = deltaLog.unsafeVolatileMetadata
val outputPath = deltaLog.dataPath
val deltaProtocol = deltaLog.unsafeVolatileSnapshot.protocol
val deltaProtocol = deltaLog.update().protocol

val indexedCols = DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(metadata)

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/io/qbeast/docs/DocumentationTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class DocumentationTests extends QbeastIntegrationTestSpec {
val qbeast_df = spark.read.format("qbeast").load(qbeast_table_path)

val deltaLog = DeltaLog.forTable(spark, qbeast_table_path)
val totalNumberOfFiles = deltaLog.unsafeVolatileSnapshot.allFiles.count()
val totalNumberOfFiles = deltaLog.update().allFiles.count()

totalNumberOfFiles should be > 1L withClue
"Total number of files in pushdown notebook changes to " + totalNumberOfFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class IndexStatusBuilderTest extends QbeastIntegrationTestSpec {

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val indexStatus =
DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot).loadLatestIndexStatus
DeltaQbeastSnapshot(deltaLog.update()).loadLatestIndexStatus

indexStatus.revision.revisionID shouldBe 1
indexStatus.cubesStatuses.foreach(_._2.files.size shouldBe 1)
Expand All @@ -44,7 +44,7 @@ class IndexStatusBuilderTest extends QbeastIntegrationTestSpec {
.save(tmpDir)
val deltaLog = DeltaLog.forTable(spark, tmpDir)
val firstIndexStatus =
DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot).loadLatestIndexStatus
DeltaQbeastSnapshot(deltaLog.update()).loadLatestIndexStatus
data.write
.format("qbeast")
.mode("append")
Expand Down
42 changes: 10 additions & 32 deletions src/test/scala/io/qbeast/spark/delta/OTreeIndexTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,11 @@ class OTreeIndexTest extends QbeastIntegrationTestSpec {

val deltaLog = DeltaLog.forTable(spark, tmpdir)
val tahoeFileIndex = {
TahoeLogFileIndex(
spark,
deltaLog,
deltaLog.dataPath,
deltaLog.unsafeVolatileSnapshot,
Seq.empty,
false)
TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, deltaLog.update(), Seq.empty, false)
}
val oTreeIndex = new OTreeIndexTest(tahoeFileIndex)

val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles.collect().map(_.path)
val allFiles = deltaLog.update().allFiles.collect().map(_.path)

val matchFiles = oTreeIndex.matchingBlocks(Seq.empty, Seq.empty).map(_.path)

Expand All @@ -79,17 +73,13 @@ class OTreeIndexTest extends QbeastIntegrationTestSpec {

val deltaLog = DeltaLog.forTable(spark, tmpdir)
val tahoeFileIndex = {
TahoeLogFileIndex(
spark,
deltaLog,
deltaLog.dataPath,
deltaLog.unsafeVolatileSnapshot,
Seq.empty,
false)
TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, deltaLog.update(), Seq.empty, false)
}
val oTreeIndex = new OTreeIndexTest(tahoeFileIndex)

oTreeIndex.inputFiles shouldBe deltaLog.unsafeVolatileSnapshot.allFiles
oTreeIndex.inputFiles shouldBe deltaLog
.update()
.allFiles
.collect()
.map(file => new Path(deltaLog.dataPath, file.path).toString)
})
Expand All @@ -107,16 +97,10 @@ class OTreeIndexTest extends QbeastIntegrationTestSpec {

val deltaLog = DeltaLog.forTable(spark, tmpdir)
val tahoeFileIndex = {
TahoeLogFileIndex(
spark,
deltaLog,
deltaLog.dataPath,
deltaLog.unsafeVolatileSnapshot,
Seq.empty,
false)
TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, deltaLog.update(), Seq.empty, false)
}
val oTreeIndex = new OTreeIndexTest(tahoeFileIndex)
val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles.collect().map(_.path)
val allFiles = deltaLog.update().allFiles.collect().map(_.path)

oTreeIndex.matchingBlocks(Seq.empty, Seq.empty).map(_.path).toSet shouldBe allFiles.toSet
})
Expand All @@ -133,17 +117,11 @@ class OTreeIndexTest extends QbeastIntegrationTestSpec {

val deltaLog = DeltaLog.forTable(spark, tmpdir)
val tahoeFileIndex = {
TahoeLogFileIndex(
spark,
deltaLog,
deltaLog.dataPath,
deltaLog.unsafeVolatileSnapshot,
Seq.empty,
false)
TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, deltaLog.update(), Seq.empty, false)
}
val oTreeIndex = new OTreeIndexTest(tahoeFileIndex)

val sizeInBytes = deltaLog.unsafeVolatileSnapshot.allFiles.collect().map(_.size).sum
val sizeInBytes = deltaLog.update().allFiles.collect().map(_.size).sum
oTreeIndex.sizeInBytes shouldBe sizeInBytes
})

Expand Down
10 changes: 5 additions & 5 deletions src/test/scala/io/qbeast/spark/delta/QbeastSnapshotTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class QbeastSnapshotTest extends QbeastIntegrationTestSpec {
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.update())
val indexStatus = qbeastSnapshot.loadLatestIndexStatus
val revision = indexStatus.revision

Expand All @@ -62,7 +62,7 @@ class QbeastSnapshotTest extends QbeastIntegrationTestSpec {
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.update())
val columnTransformers = SparkRevisionFactory
.createNewRevision(QTableID(tmpDir), df.schema, options)
.columnTransformers
Expand Down Expand Up @@ -92,7 +92,7 @@ class QbeastSnapshotTest extends QbeastIntegrationTestSpec {
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.update())
val timestamp = System.currentTimeMillis()
qbeastSnapshot.loadRevisionAt(timestamp) shouldBe qbeastSnapshot.loadLatestRevision

Expand All @@ -117,7 +117,7 @@ class QbeastSnapshotTest extends QbeastIntegrationTestSpec {
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.update())
an[AnalysisException] shouldBe thrownBy(
qbeastSnapshot.loadRevisionAt(invalidRevisionTimestamp))

Expand All @@ -140,7 +140,7 @@ class QbeastSnapshotTest extends QbeastIntegrationTestSpec {
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.update())
val builder =
new IndexStatusBuilder(
qbeastSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class AnalyzeAndOptimizeTest
qbeastTable.optimize()

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val replicatedCubes = qbeastSnapshot.loadLatestIndexStatus.replicatedSet

val announcedCubes = qbeastTable.analyze()
Expand All @@ -72,7 +72,7 @@ class AnalyzeAndOptimizeTest
val announcedCubes = qbeastTable.analyze()
qbeastTable.optimize()
val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val replicatedCubes =
qbeastSnapshot.loadLatestIndexStatus.replicatedSet.map(_.string)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class CubeWeightsIntegrationTest extends QbeastIntegrationTestSpec with PrivateM
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val commitLogWeightMap = qbeastSnapshot.loadLatestIndexStatus.cubesStatuses

// commitLogWeightMap shouldBe weightMap
Expand All @@ -62,7 +62,7 @@ class CubeWeightsIntegrationTest extends QbeastIntegrationTestSpec with PrivateM
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val cubeWeights = qbeastSnapshot.loadLatestIndexStatus.cubesStatuses

cubeWeights.values.foreach { case CubeStatus(_, weight, _, _) =>
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/io/qbeast/spark/index/DataStagingTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class DataStagingTest

def getQbeastSnapshot(spark: SparkSession, dir: String): DeltaQbeastSnapshot = {
val deltaLog = DeltaLog.forTable(spark, dir)
DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
DeltaQbeastSnapshot(deltaLog.update())
}

private val getCurrentStagingSize: PrivateMethod[Long] =
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/io/qbeast/spark/index/IndexTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class IndexTest
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())

val offset = 0.5
val appendData = df
Expand Down Expand Up @@ -190,7 +190,7 @@ class IndexTest

val deltaLog = DeltaLog.forTable(spark, tmpDir)

deltaLog.unsafeVolatileSnapshot.allFiles.collect() foreach (f =>
deltaLog.update().allFiles.collect() foreach (f =>
{
val cubeId = CubeId(2, f.tags("cube"))
cubeId.parent match {
Expand Down
18 changes: 9 additions & 9 deletions src/test/scala/io/qbeast/spark/index/NewRevisionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class NewRevisionTest
spaceMultipliers.foreach(i => appendNewRevision(spark, tmpDir, i))

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val spaceRevisions = qbeastSnapshot.loadAllRevisions

// Including the staging revision
Expand All @@ -55,7 +55,7 @@ class NewRevisionTest
appendNewRevision(spark, tmpDir, 3)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())

val revisions = qbeastSnapshot.loadAllRevisions
val allWM =
Expand Down Expand Up @@ -88,7 +88,7 @@ class NewRevisionTest
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())

qbeastSnapshot.loadLatestRevision.desiredCubeSize shouldBe cubeSize

Expand Down Expand Up @@ -121,7 +121,7 @@ class NewRevisionTest
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())

// Including the staging revision
qbeastSnapshot.loadAllRevisions.size shouldBe 3
Expand Down Expand Up @@ -150,7 +150,7 @@ class NewRevisionTest
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val transformation = qbeastSnapshot.loadLatestRevision.transformations.head

qbeastSnapshot.loadLatestRevision.revisionID shouldBe 1
Expand Down Expand Up @@ -182,7 +182,7 @@ class NewRevisionTest
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val transformation = qbeastSnapshot.loadLatestRevision.transformations.head

qbeastSnapshot.loadLatestRevision.revisionID shouldBe 1
Expand Down Expand Up @@ -215,7 +215,7 @@ class NewRevisionTest
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val revision = qbeastSnapshot.loadLatestRevision
val transformation = revision.transformations.head

Expand Down Expand Up @@ -253,7 +253,7 @@ class NewRevisionTest
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val allRevisions = qbeastSnapshot.loadAllRevisions.sortBy(_.revisionID)

val firstWriteTransformation =
Expand Down Expand Up @@ -303,7 +303,7 @@ class NewRevisionTest
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.unsafeVolatileSnapshot)
val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.update())
val allRevisions = qbeastSnapshot.loadAllRevisions.sortBy(_.revisionID)

val firstWriteTransformation =
Expand Down
Loading

0 comments on commit a6cd970

Please sign in to comment.