Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

207 rollup compaction simplified #232

Merged
68 changes: 68 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/Block.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.core.model

/**
* Block of elements stored in the physical index file.
*
* @constructor creates a new instance for given attributes.
* @param owner the physical file the block belongs to
* @param revisionId the revision identifier
* @param cubeId the cube identifier
* @param minWeight the minimum element weight
* @param maxWeight the maximum element weight
* @param replicated the block is replicated
*/
final class Block private[model] (
private[model] var owner: Option[IndexFile],
val cubeId: CubeId,
val minWeight: Weight,
val maxWeight: Weight,
val elementCount: Long,
val replicated: Boolean)
extends Serializable {

/**
* Returns the file.
*
* @return the file
*/
def file: IndexFile = owner.get

/**
* Replicates the block.
*
* @return the copy of the block with the replicated attribute set to true
*/
def replicate(): Block =
if (replicated) this else new Block(owner, cubeId, minWeight, maxWeight, elementCount, true)

override def equals(obj: Any): Boolean = obj match {
case other: Block => (
owner.map(_.path) == other.owner.map(_.path)
&& cubeId == other.cubeId
&& minWeight == other.minWeight
&& maxWeight == other.maxWeight
&& elementCount == other.elementCount
&& replicated == other.replicated
)
case _ => false

Check warning on line 50 in core/src/main/scala/io/qbeast/core/model/Block.scala

View check run for this annotation

Codecov / codecov/patch

core/src/main/scala/io/qbeast/core/model/Block.scala#L50

Added line #L50 was not covered by tests
}

override def hashCode(): Int = {
val prime = 31
var result = 1
result = prime * result + owner.map(_.path).hashCode()
result = prime * result + cubeId.hashCode()
result = prime * result + minWeight.hashCode()
result = prime * result + maxWeight.hashCode()
result = prime * result + elementCount.hashCode()
result = prime * result + replicated.hashCode()
result
}

override def toString(): String =
s"Block(${owner.map(_.path)}, ${cubeId}, ${minWeight}, ${maxWeight}, ${elementCount}, ${replicated})"

}
6 changes: 4 additions & 2 deletions core/src/main/scala/io/qbeast/core/model/DataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ trait DataWriter[DATA, DataSchema, FileDescriptor] {
* Compact the files
* @param tableID the table identifier
* @param schema the schema of the data
* @param revision the revision of the index
* @param indexStatus the current index status
* @param tableChanges the current table changes
* @param indexFiles the index files to compact
* @return the sequence of files written and deleted
*/
def compact(
tableID: QTableID,
schema: DataSchema,
revision: Revision,
indexStatus: IndexStatus,
tableChanges: TableChanges): IISeq[FileDescriptor]
indexFiles: Seq[IndexFile]): IISeq[FileDescriptor]

}
61 changes: 61 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/IndexFile.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.core.model

import io.qbeast.IISeq

/**
* Index file represents a physical file where blocks of the elements are
* stored.
*
* @param path the file path
* @param size the file size in bytes
* @param modificationTime the last modification timestamp
* @param blocks the blocks
*/
final class IndexFile private[model] (
val path: String,
val size: Long,
val modificationTime: Long,
val revisionId: RevisionID,
val blocks: IISeq[Block])
extends Serializable {

/**
* The number of elements in the file
*
* @return the number of elements
*/
def elementCount = blocks.map(_.elementCount).sum

/**
* Returns whether file contains data from a given cube.
*
* @param cubeId the cube identifier
* @return the file contains data of the cube
*/
def hasCubeData(cubeId: CubeId): Boolean = blocks.exists(_.cubeId == cubeId)

/**
* Tries to replicate the blocks that belong to the specified cubes
*
* @param the cube identifiers
* @return an instance with corresponding blocks replicated or None if there
* are no such blocks
*/
def tryReplicateBlocks(cubeIds: Set[CubeId]): Option[IndexFile] = {
if (!blocks.exists(block => cubeIds.contains(block.cubeId))) {
return None
}
val newModificationTime = System.currentTimeMillis()
val newBlocks = blocks.map { block =>
if (cubeIds.contains(block.cubeId)) block.replicate() else block
}
Some(new IndexFile(path, size, newModificationTime, revisionId, newBlocks))
}

override def toString(): String =
s"IndexFile(${path}, ${size}, ${modificationTime}, ${revisionId}, ${blocks})"

}
194 changes: 194 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/IndexFileBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.core.model

import scala.collection.immutable

/**
* Builder for creating IndexFile instances.
*/
final class IndexFileBuilder {
private var path: Option[String] = None
private var size: Long = 0L
private var modificationTime: Long = 0L
private var revisionId: RevisionID = 0L
private val blocks = immutable.Seq.newBuilder[Block]

/**
* Sets the path.
*
* @param path the path to set
* @return this instance
*/
def setPath(path: String): IndexFileBuilder = {
this.path = Some(path)
this
}

/**
* Sets the size in bytes.
*
* @param size the size in bytes
* @return this instance
*/
def setSize(size: Long): IndexFileBuilder = {
this.size = size
this
}

/**
* Sets the modification time
*
* @param modificationTime the modification time to set
* @return this instance
*/
def setModificationTime(modificationTime: Long): IndexFileBuilder = {
this.modificationTime = modificationTime
this
}

/**
* Sets the revision identifier.
*
* @param revisionId the revision identifier to set
* @return this instance
*/
def setRevisionId(revisionId: RevisionID): IndexFileBuilder = {
this.revisionId = revisionId
this
}

/**
* Begins a new block.
*
* @return a new block builder
*/
def beginBlock(): IndexFileBuilder.BlockBuilder = new IndexFileBuilder.BlockBuilder(this)

private def addBlock(block: Block): IndexFileBuilder = {
blocks += block
this
}

/**
* Builds th result.
*
* @return the index file
*/
def result(): IndexFile = {
val file = new IndexFile(path.get, size, modificationTime, revisionId, blocks.result())
file.blocks.foreach(_.owner = Some(file))
file
}

}

/**
* IndexFileBuilder companion object.
*/
object IndexFileBuilder {

/**
* Builder for creating Blockk instances.
*/
final class BlockBuilder private[IndexFileBuilder] (owner: IndexFileBuilder) {
private var cubeId: Option[CubeId] = None
private var minWeight: Option[Weight] = None
private var maxWeight: Option[Weight] = None
private var elementCount: Long = 0L
private var replicated: Boolean = false

/**
* Sets the cube identifier.
*
* @param the cube identifier to set
* @return this instance
*/
def setCubeId(cubeId: CubeId): BlockBuilder = {
this.cubeId = Some(cubeId)
this
}

/**
* Sets the minimum weight.
*
* @param minWeight the minimum weight to set
* @return this instance
*/
def setMinWeight(minWeight: Weight): BlockBuilder = {
this.minWeight = Some(minWeight)
this
}

/**
* Updates the minimum weight by setting the value to minimum between the
* previous value and the specified one.
*
* @param minWeight the minimum weight to set
* @return this instance
*/
def updateMinWeight(minWeight: Weight): BlockBuilder = {
this.minWeight = this.minWeight match {
case Some(value) => Some(Weight.min(minWeight, value))
case None => Some(minWeight)
}
this
}

/**
* Sets the maximum weight.
*
* @param maxWeight the maximum weight to set
* @return this instance
*/
def setMaxWeight(maxWeight: Weight): BlockBuilder = {
this.maxWeight = Some(maxWeight)
this
}

/**
* Sets the element count.
*
* @param elemenCount the elemenCount to set
* @return this instance
*/
def setElementCount(elementCount: Long): BlockBuilder = {
this.elementCount = elementCount
this
}

/**
* Increments the element count.
*
* @return this instance
*/
def incrementElementCount(): BlockBuilder = setElementCount(elementCount + 1)

/**
* Sets replicated flag
*
* @param replicated the replicated flag value to set
*/
def setReplicated(replicated: Boolean): BlockBuilder = {
this.replicated = replicated
this
}

/**
* Ends the block.
*
* @return the IndexFileBuilder instance
*/
def endBlock(): IndexFileBuilder = owner.addBlock(
new Block(
None,
cubeId.get,
minWeight.getOrElse(Weight.MinValue),
maxWeight.getOrElse(Weight.MaxValue),
elementCount,
replicated))

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PointWeightIndexer(val tableChanges: TableChanges) extends Serializable {
var continue = true
while (continue && containers.hasNext) {
val cubeId = containers.next()
tableChanges.cubeWeights(cubeId) match {
tableChanges.cubeWeight(cubeId) match {
case Some(cubeWeight) if weight <= cubeWeight =>
builder += cubeId
continue = tableChanges.announcedOrReplicatedSet.contains(cubeId)
Expand Down
Loading
Loading