diff --git a/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt b/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt new file mode 100644 index 00000000000..87f7c6e3dd8 --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt @@ -0,0 +1,6 @@ +Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6 +Apple M3 +Collecting files ranger access request: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +50000 files benchmark 1281 1310 33 -0.0 -1280563000.0 1.0X + diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml b/extensions/spark/kyuubi-spark-authz/pom.xml index 8ab3720b13f..b3970cc2c89 100644 --- a/extensions/spark/kyuubi-spark-authz/pom.xml +++ b/extensions/spark/kyuubi-spark-authz/pom.xml @@ -380,6 +380,13 @@ ${hudi.artifact} test + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + test + + diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala index e25cd2a7004..1fd86a3789c 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.plugin.spark.authz.ranger -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import org.apache.ranger.plugin.policyengine.RangerAccessRequest import org.apache.spark.sql.SparkSession @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.kyuubi.plugin.spark.authz._ import org.apache.kyuubi.plugin.spark.authz.ObjectType._ +import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._ import org.apache.kyuubi.plugin.spark.authz.rule.Authorization import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ @@ -34,15 +35,19 @@ case class RuleAuthorization(spark: SparkSession) extends Authorization(spark) { val auditHandler = new SparkRangerAuditHandler val ugi = getAuthzUgi(spark.sparkContext) val (inputs, outputs, opType) = PrivilegesBuilder.build(plan, spark) - val requests = new ArrayBuffer[AccessRequest]() + + // Use a HashSet to deduplicate the same AccessResource and AccessType, the requests will be all + // the non-duplicate requests and in the same order as the input requests. + val requests = new mutable.ArrayBuffer[AccessRequest]() + val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]() def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: Boolean): Unit = { objects.foreach { obj => val resource = AccessResource(obj, opType) val accessType = ranger.AccessType(obj, opType, isInput) - if (accessType != AccessType.NONE && !requests.exists(o => - o.accessType == accessType && o.getResource == resource)) { + if (accessType != AccessType.NONE && !requestsSet.contains((resource, accessType))) { requests += AccessRequest(resource, ugi, opType, accessType) + requestsSet.add(resource, accessType) } } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala new file mode 100644 index 00000000000..b1be2c95778 --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.spark.authz.benchmark + +import java.io.{File, FileOutputStream, OutputStream} + +import scala.collection.JavaConverters._ + +import com.google.common.reflect.ClassPath +import org.scalatest.Assertions._ + +trait KyuubiBenchmarkBase { + var output: Option[OutputStream] = None + + private val prefix = { + val benchmarkClasses = ClassPath.from(Thread.currentThread.getContextClassLoader) + .getTopLevelClassesRecursive("org.apache.spark.sql").asScala.toArray + assert(benchmarkClasses.nonEmpty) + val benchmark = benchmarkClasses.find(_.load().getName.endsWith("Benchmark")) + val targetDirOrProjDir = + new File(benchmark.get.load().getProtectionDomain.getCodeSource.getLocation.toURI) + .getParentFile.getParentFile + if (targetDirOrProjDir.getName == "target") { + targetDirOrProjDir.getParentFile.getCanonicalPath + "/" + } else { + targetDirOrProjDir.getCanonicalPath + "/" + } + } + + def withHeader(func: => Unit): Unit = { + val version = System.getProperty("java.version").split("\\D+")(0).toInt + val jdkString = if (version > 8) s"-jdk$version" else "" + val resultFileName = + s"${this.getClass.getSimpleName.replace("$", "")}$jdkString-results.txt" + val dir = new File(s"${prefix}benchmarks/") + if (!dir.exists()) { + // scalastyle:off println + println(s"Creating ${dir.getAbsolutePath} for benchmark results.") + // scalastyle:on println + dir.mkdirs() + } + val file = new File(dir, resultFileName) + if (!file.exists()) { + file.createNewFile() + } + output = Some(new FileOutputStream(file)) + + func + + output.foreach { o => + if (o != null) { + o.close() + } + } + } +} diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala new file mode 100644 index 00000000000..4286d2fca35 --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File +import java.nio.file.Files + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.reflect.io.Path.jfile2path + +import org.apache.spark.benchmark.Benchmark +import org.scalatest.BeforeAndAfterAll +// scalastyle:off +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider +import org.apache.kyuubi.plugin.spark.authz.benchmark.KyuubiBenchmarkBase +import org.apache.kyuubi.plugin.spark.authz.ranger.RuleAuthorization +import org.apache.kyuubi.util.ThreadUtils + +/** + * Benchmark to measure performance of collecting ranger access requests. + * + * {{{ + * RUN_BENCHMARK=1 ./build/mvn clean test \ + * -pl extensions/spark/kyuubi-spark-authz -am \ + * -Dtest=none -DwildcardSuites=org.apache.spark.sql.RuleAuthorizationBenchmark + * }}} + */ +class RuleAuthorizationBenchmark extends AnyFunSuite + with SparkSessionProvider with BeforeAndAfterAll + with KyuubiBenchmarkBase { + // scalastyle:on + + override protected val catalogImpl: String = "hive" + private val runBenchmark = sys.env.contains("RUN_BENCHMARK") + + private val fileNumbers = 50000 + + private var tempDir: File = _ + + override def beforeAll(): Unit = { + tempDir = Files.createTempDirectory("kyuubi-test-").toFile + } + + override def afterAll(): Unit = { + if (tempDir != null) { + tempDir.deleteRecursively() + } + spark.stop() + super.afterAll() + } + + test("Collecting files ranger access request") { + assume(runBenchmark) + + val futures = (1 to fileNumbers).map { i => + Future { + val file = new File(tempDir, s"file_$i.txt") + file.createNewFile() + } + } + val allFutures = Future.sequence(futures) + ThreadUtils.awaitResult(allFutures, Duration.Inf) + + val df = spark.read.text(tempDir + "/file_*.txt") + val plan = df.queryExecution.optimizedPlan + + withHeader { + val benchmark = new Benchmark(s"Collecting files ranger access request", -1, output = output) + benchmark.addCase(s"$fileNumbers files benchmark", 3) { _ => + RuleAuthorization(spark).checkPrivileges(spark, plan) + } + benchmark.run() + } + } +}