From 3833d204f5ad403546c1e3958eab044da088737c Mon Sep 17 00:00:00 2001 From: Eric Charles Date: Tue, 27 Mar 2018 16:56:46 +0200 Subject: [PATCH] first changes to support client-mode --- .../k8s/integrationtest/KubernetesSuite.scala | 37 +++++++++++++++++-- .../KubernetesTestComponents.scala | 3 +- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 04782d9..aaf2d14 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -174,14 +174,26 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit sparkAppConf .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar") runSparkJVMCheckAndVerifyCompletion( - expectedJVMValue=Seq("(spark.test.foo,spark.test.bar)")) + expectedJVMValue=Seq("(spark.test.foo,spark.test.bar)"), + deployMode = "cluster") +/* + runSparkJVMCheckAndVerifyCompletion( + expectedJVMValue=Seq("(spark.test.foo,spark.test.bar)"), + deployMode = "client") +*/ } test("Run SparkRemoteFileTest using a remote data file") { sparkAppConf .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE) runSparkRemoteCheckAndVerifyCompletion( - appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME)) + appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME), + deployMode = "cluster") +/* + runSparkRemoteCheckAndVerifyCompletion( + appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME), + deployMode = "client") +*/ } private def runSparkPiAndVerifyCompletion( @@ -190,18 +202,32 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, appArgs: Array[String] = Array.empty[String], appLocator: String = appLocator): Unit = { +/* + runSparkApplicationAndVerifyCompletion( + appResource, + SPARK_PI_MAIN_CLASS, + Seq("Pi is roughly 3"), + appArgs, + "client", + driverPodChecker, + executorPodChecker, + appLocator) +*/ runSparkApplicationAndVerifyCompletion( appResource, SPARK_PI_MAIN_CLASS, Seq("Pi is roughly 3"), appArgs, + "client", driverPodChecker, executorPodChecker, appLocator) + } private def runSparkRemoteCheckAndVerifyCompletion( appResource: String = containerLocalSparkDistroExamplesJar, + deployMode: String, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, appArgs: Array[String], @@ -211,6 +237,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit SPARK_REMOTE_MAIN_CLASS, Seq(s"Mounting of ${appArgs.head} was true"), appArgs, + deployMode, driverPodChecker, executorPodChecker, appLocator) @@ -218,6 +245,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit private def runSparkJVMCheckAndVerifyCompletion( appResource: String = containerLocalSparkDistroExamplesJar, + deployMode: String, mainClass: String = SPARK_DRIVER_MAIN_CLASS, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, appArgs: Array[String] = Array("5"), @@ -226,7 +254,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit mainAppResource = appResource, mainClass = mainClass, appArgs = appArgs) - SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) + SparkAppLauncher.launch(appArguments, sparkAppConf, deployMode, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) val driverPod = kubernetesTestComponents.kubernetesClient .pods() @@ -253,6 +281,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit mainClass: String, expectedLogOnCompletion: Seq[String], appArgs: Array[String], + deployMode: String, driverPodChecker: Pod => Unit, executorPodChecker: Pod => Unit, appLocator: String): Unit = { @@ -260,7 +289,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit mainAppResource = appResource, mainClass = mainClass, appArgs = appArgs) - SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) + SparkAppLauncher.launch(appArguments, sparkAppConf, deployMode, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) val driverPod = kubernetesTestComponents.kubernetesClient .pods() diff --git a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index b9c87d5..f59e21d 100644 --- a/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -98,12 +98,13 @@ private[spark] object SparkAppLauncher extends Logging { def launch( appArguments: SparkAppArguments, appConf: SparkAppConf, + deployMode: String, timeoutSecs: Int, sparkHomeDir: Path): Unit = { val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit")) logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf") val commandLine = Array(sparkSubmitExecutable.toFile.getAbsolutePath, - "--deploy-mode", "cluster", + "--deploy-mode", deployMode, "--class", appArguments.mainClass, "--master", appConf.get("spark.master") ) ++ appConf.toStringArray :+