From 001b8a22cb12b024d06a06a5fd2d851e3eec100c Mon Sep 17 00:00:00 2001 From: cugni Date: Mon, 28 Oct 2024 12:28:17 +0100 Subject: [PATCH] Implementing the "next in line" change to the rollup algorithm to fix #454 --- .../scala/io/qbeast/spark/writer/Rollup.scala | 18 ++++--- .../delta/DeltaRollupDataWriterTest.scala | 4 +- .../io/qbeast/spark/writer/RollupTest.scala | 54 +++++++++++-------- 3 files changed, 46 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala index dcefe5628..75b73af18 100644 --- a/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala +++ b/core/src/main/scala/io/qbeast/spark/writer/Rollup.scala @@ -54,18 +54,22 @@ private[writer] class Rollup(limit: Double) { * the rollup result */ def compute(): Map[CubeId, CubeId] = { - val queue = new mutable.PriorityQueue()(Ordering.by[CubeId, Int](_.depth)) + val queue: mutable.PriorityQueue[CubeId] = + new mutable.PriorityQueue()(Ordering.by[CubeId, Int](_.depth)) groups.keys.foreach(queue.enqueue(_)) while (queue.nonEmpty) { val cubeId = queue.dequeue() - val group = groups(cubeId) + val group: Group = groups(cubeId) if (group.size < limit && !cubeId.isRoot) { - val Some(parentCubeId) = cubeId.parent - if (groups.contains(parentCubeId)) { - groups(parentCubeId).add(group) + val nextInLine = cubeId.nextSibling match { + case Some(a) => a + case None => cubeId.parent.get + } + if (groups.contains(nextInLine)) { + groups(nextInLine).add(group) } else { - groups.put(parentCubeId, group) - queue.enqueue(parentCubeId) + groups.put(nextInLine, group) + queue.enqueue(nextInLine) } groups.remove(cubeId) } diff --git a/src/test/scala/io/qbeast/spark/delta/DeltaRollupDataWriterTest.scala b/src/test/scala/io/qbeast/spark/delta/DeltaRollupDataWriterTest.scala index e7ad21439..c8a98f72f 100644 --- a/src/test/scala/io/qbeast/spark/delta/DeltaRollupDataWriterTest.scala +++ b/src/test/scala/io/qbeast/spark/delta/DeltaRollupDataWriterTest.scala @@ -62,7 +62,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec { } it should "compute rollup correctly when optimizing" in - withSparkAndTmpDir { (spark, tmpDir) => + withSparkAndTmpDir { (_, tmpDir) => val revision = Revision(1L, 0, QTableID(tmpDir), 20, Vector(EmptyTransformer("col_1")), Vector.empty) @@ -77,7 +77,7 @@ class DeltaRollupDataWriterTest extends QbeastIntegrationTestSpec { Map(root -> 20L, c1 -> 1L, c2 -> 20L)) val rollup = DeltaRollupDataWriter.computeRollup(tc) - rollup shouldBe Map(root -> root, c1 -> root, c2 -> c2) + rollup shouldBe Map(root -> root, c1 -> c2, c2 -> c2) } } diff --git a/src/test/scala/io/qbeast/spark/writer/RollupTest.scala b/src/test/scala/io/qbeast/spark/writer/RollupTest.scala index 2aceec6fd..7aaad64ea 100644 --- a/src/test/scala/io/qbeast/spark/writer/RollupTest.scala +++ b/src/test/scala/io/qbeast/spark/writer/RollupTest.scala @@ -1,30 +1,12 @@ -/* - * Copyright 2021 Qbeast Analytics, S.L. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package io.qbeast.spark.writer import io.qbeast.core.model.CubeId import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -/** - * Tests of Rollup. - */ class RollupTest extends AnyFlatSpec with Matchers { - "Rollup" should "work correctly" in { + "Rollup" should "work correctly with basic cube structure" in { val root = CubeId.root(1) val c0 = root.firstChild val c1 = c0.nextSibling.get @@ -44,8 +26,38 @@ class RollupTest extends AnyFlatSpec with Matchers { result(root) shouldBe root result(c00) shouldBe c0 result(c01) shouldBe c0 - result(c10) shouldBe root + result(c10) shouldBe c11 // rolliing up into the next siblings. result(c11) shouldBe c11 } -} + it should "handle empty rollup" in { + val result = new Rollup(3).compute() + result shouldBe empty + } + + it should "handle single cube" in { + val root = CubeId.root(1) + val result = new Rollup(3) + .populate(root, 2) + .compute() + + result(root) shouldBe root + } + + it should "roll up to parent when size exceeds limit" in { + val root = CubeId.root(1) + val kids = root.children.toSeq + val child = kids(0) + val grandChild = kids(1) + + val result = new Rollup(2) + .populate(root,1) + .populate(child,2) + .populate(grandChild, 3) // Exceeds limit + .compute() + + result(grandChild) shouldBe grandChild + } + + +} \ No newline at end of file