Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #454: Next in line Rollup #455

Draft
wants to merge 1 commit into
base: refactor
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions core/src/main/scala/io/qbeast/spark/writer/Rollup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,22 @@ private[writer] class Rollup(limit: Double) {
* the rollup result
*/
def compute(): Map[CubeId, CubeId] = {
val queue = new mutable.PriorityQueue()(Ordering.by[CubeId, Int](_.depth))
val queue: mutable.PriorityQueue[CubeId] =
new mutable.PriorityQueue()(Ordering.by[CubeId, Int](_.depth))
groups.keys.foreach(queue.enqueue(_))
while (queue.nonEmpty) {
val cubeId = queue.dequeue()
val group = groups(cubeId)
val group: Group = groups(cubeId)
if (group.size < limit && !cubeId.isRoot) {
val Some(parentCubeId) = cubeId.parent
if (groups.contains(parentCubeId)) {
groups(parentCubeId).add(group)
val nextInLine = cubeId.nextSibling match {
case Some(a) => a
case None => cubeId.parent.get
}
if (groups.contains(nextInLine)) {
groups(nextInLine).add(group)
} else {
groups.put(parentCubeId, group)
queue.enqueue(parentCubeId)
groups.put(nextInLine, group)
queue.enqueue(nextInLine)
}
groups.remove(cubeId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec {
}

it should "compute rollup correctly when optimizing" in
withSparkAndTmpDir { (spark, tmpDir) =>
withSparkAndTmpDir { (_, tmpDir) =>
val revision =
Revision(1L, 0, QTableID(tmpDir), 20, Vector(EmptyTransformer("col_1")), Vector.empty)

Expand All @@ -77,7 +77,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec {
Map(root -> 20L, c1 -> 1L, c2 -> 20L))

val rollup = DeltaRollupDataWriter.computeRollup(tc)
rollup shouldBe Map(root -> root, c1 -> root, c2 -> c2)
rollup shouldBe Map(root -> root, c1 -> c2, c2 -> c2)
}

}
54 changes: 33 additions & 21 deletions src/test/scala/io/qbeast/spark/writer/RollupTest.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,12 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.writer

import io.qbeast.core.model.CubeId
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

/**
* Tests of Rollup.
*/
class RollupTest extends AnyFlatSpec with Matchers {

"Rollup" should "work correctly" in {
"Rollup" should "work correctly with basic cube structure" in {
val root = CubeId.root(1)
val c0 = root.firstChild
val c1 = c0.nextSibling.get
Expand All @@ -44,8 +26,38 @@ class RollupTest extends AnyFlatSpec with Matchers {
result(root) shouldBe root
result(c00) shouldBe c0
result(c01) shouldBe c0
result(c10) shouldBe root
result(c10) shouldBe c11 // rolliing up into the next siblings.
result(c11) shouldBe c11
}

}
it should "handle empty rollup" in {
val result = new Rollup(3).compute()
result shouldBe empty
}

it should "handle single cube" in {
val root = CubeId.root(1)
val result = new Rollup(3)
.populate(root, 2)
.compute()

result(root) shouldBe root
}

it should "roll up to parent when size exceeds limit" in {
val root = CubeId.root(1)
val kids = root.children.toSeq
val child = kids(0)
val grandChild = kids(1)

val result = new Rollup(2)
.populate(root,1)
.populate(child,2)
.populate(grandChild, 3) // Exceeds limit
.compute()

result(grandChild) shouldBe grandChild
}


}
Loading