diff --git a/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt b/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt
new file mode 100644
index 00000000000..87f7c6e3dd8
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt
@@ -0,0 +1,6 @@
+Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6
+Apple M3
+Collecting files ranger access request: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+50000 files benchmark 1281 1310 33 -0.0 -1280563000.0 1.0X
+
diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml b/extensions/spark/kyuubi-spark-authz/pom.xml
index 8ab3720b13f..b3970cc2c89 100644
--- a/extensions/spark/kyuubi-spark-authz/pom.xml
+++ b/extensions/spark/kyuubi-spark-authz/pom.xml
@@ -380,6 +380,13 @@
${hudi.artifact}
test
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ test-jar
+ test
+
+
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
index e25cd2a7004..1fd86a3789c 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.plugin.spark.authz.ranger
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
import org.apache.ranger.plugin.policyengine.RangerAccessRequest
import org.apache.spark.sql.SparkSession
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.kyuubi.plugin.spark.authz._
import org.apache.kyuubi.plugin.spark.authz.ObjectType._
+import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType
import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
import org.apache.kyuubi.plugin.spark.authz.rule.Authorization
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
@@ -34,15 +35,19 @@ case class RuleAuthorization(spark: SparkSession) extends Authorization(spark) {
val auditHandler = new SparkRangerAuditHandler
val ugi = getAuthzUgi(spark.sparkContext)
val (inputs, outputs, opType) = PrivilegesBuilder.build(plan, spark)
- val requests = new ArrayBuffer[AccessRequest]()
+
+ // Use a HashSet to deduplicate the same AccessResource and AccessType, the requests will be all
+ // the non-duplicate requests and in the same order as the input requests.
+ val requests = new mutable.ArrayBuffer[AccessRequest]()
+ val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]()
def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: Boolean): Unit = {
objects.foreach { obj =>
val resource = AccessResource(obj, opType)
val accessType = ranger.AccessType(obj, opType, isInput)
- if (accessType != AccessType.NONE && !requests.exists(o =>
- o.accessType == accessType && o.getResource == resource)) {
+ if (accessType != AccessType.NONE && !requestsSet.contains((resource, accessType))) {
requests += AccessRequest(resource, ugi, opType, accessType)
+ requestsSet.add(resource, accessType)
}
}
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala
new file mode 100644
index 00000000000..b1be2c95778
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.benchmark
+
+import java.io.{File, FileOutputStream, OutputStream}
+
+import scala.collection.JavaConverters._
+
+import com.google.common.reflect.ClassPath
+import org.scalatest.Assertions._
+
+trait KyuubiBenchmarkBase {
+ var output: Option[OutputStream] = None
+
+ private val prefix = {
+ val benchmarkClasses = ClassPath.from(Thread.currentThread.getContextClassLoader)
+ .getTopLevelClassesRecursive("org.apache.spark.sql").asScala.toArray
+ assert(benchmarkClasses.nonEmpty)
+ val benchmark = benchmarkClasses.find(_.load().getName.endsWith("Benchmark"))
+ val targetDirOrProjDir =
+ new File(benchmark.get.load().getProtectionDomain.getCodeSource.getLocation.toURI)
+ .getParentFile.getParentFile
+ if (targetDirOrProjDir.getName == "target") {
+ targetDirOrProjDir.getParentFile.getCanonicalPath + "/"
+ } else {
+ targetDirOrProjDir.getCanonicalPath + "/"
+ }
+ }
+
+ def withHeader(func: => Unit): Unit = {
+ val version = System.getProperty("java.version").split("\\D+")(0).toInt
+ val jdkString = if (version > 8) s"-jdk$version" else ""
+ val resultFileName =
+ s"${this.getClass.getSimpleName.replace("$", "")}$jdkString-results.txt"
+ val dir = new File(s"${prefix}benchmarks/")
+ if (!dir.exists()) {
+ // scalastyle:off println
+ println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
+ // scalastyle:on println
+ dir.mkdirs()
+ }
+ val file = new File(dir, resultFileName)
+ if (!file.exists()) {
+ file.createNewFile()
+ }
+ output = Some(new FileOutputStream(file))
+
+ func
+
+ output.foreach { o =>
+ if (o != null) {
+ o.close()
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
new file mode 100644
index 00000000000..4286d2fca35
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+import java.nio.file.Files
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.reflect.io.Path.jfile2path
+
+import org.apache.spark.benchmark.Benchmark
+import org.scalatest.BeforeAndAfterAll
+// scalastyle:off
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
+import org.apache.kyuubi.plugin.spark.authz.benchmark.KyuubiBenchmarkBase
+import org.apache.kyuubi.plugin.spark.authz.ranger.RuleAuthorization
+import org.apache.kyuubi.util.ThreadUtils
+
+/**
+ * Benchmark to measure performance of collecting ranger access requests.
+ *
+ * {{{
+ * RUN_BENCHMARK=1 ./build/mvn clean test \
+ * -pl extensions/spark/kyuubi-spark-authz -am \
+ * -Dtest=none -DwildcardSuites=org.apache.spark.sql.RuleAuthorizationBenchmark
+ * }}}
+ */
+class RuleAuthorizationBenchmark extends AnyFunSuite
+ with SparkSessionProvider with BeforeAndAfterAll
+ with KyuubiBenchmarkBase {
+ // scalastyle:on
+
+ override protected val catalogImpl: String = "hive"
+ private val runBenchmark = sys.env.contains("RUN_BENCHMARK")
+
+ private val fileNumbers = 50000
+
+ private var tempDir: File = _
+
+ override def beforeAll(): Unit = {
+ tempDir = Files.createTempDirectory("kyuubi-test-").toFile
+ }
+
+ override def afterAll(): Unit = {
+ if (tempDir != null) {
+ tempDir.deleteRecursively()
+ }
+ spark.stop()
+ super.afterAll()
+ }
+
+ test("Collecting files ranger access request") {
+ assume(runBenchmark)
+
+ val futures = (1 to fileNumbers).map { i =>
+ Future {
+ val file = new File(tempDir, s"file_$i.txt")
+ file.createNewFile()
+ }
+ }
+ val allFutures = Future.sequence(futures)
+ ThreadUtils.awaitResult(allFutures, Duration.Inf)
+
+ val df = spark.read.text(tempDir + "/file_*.txt")
+ val plan = df.queryExecution.optimizedPlan
+
+ withHeader {
+ val benchmark = new Benchmark(s"Collecting files ranger access request", -1, output = output)
+ benchmark.addCase(s"$fileNumbers files benchmark", 3) { _ =>
+ RuleAuthorization(spark).checkPrivileges(spark, plan)
+ }
+ benchmark.run()
+ }
+ }
+}