Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVY-246 Support multiple Spark home in runtime #318

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
# If livy should impersonate the requesting users when creating a new session.
# livy.impersonation.enabled = true

# Livy Spark Home
# livy.server.spark-home-1.5.2=<path>/spark-1.5.2
# livy.server.spark-home-1.6.3=<path>/spark-1.6.3
# livy.server.spark-home-2.0.1=<path>/spark-2.0.1
# livy.server.spark-home-2.1.0=<path>/spark-2.1.0

# Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation
# directory every time a session is started. By caching these files in HDFS, for example, startup
# time of sessions on YARN can be reduced.
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,25 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)

/** Return the spark home version */
def SPARK_HOME_VER(version: String): Option[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more along the lines of adding
def SPARK_HOME_VER(version: String) = Entry(s"livy.server.spark-home-$version", null)
to the LivyConf object above as a def similar to the val's, then doing Option(get(SPARK_HOME_VER(version))) which you could then do an orElse throw Exception on inside the sparkHome def as before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I declare def SPARK_HOME_VER(version: String) = Entry(s"livy.server.spark-home-$version", null) then I cant pass it in def sparkHome(version: Option[String] = None): Option[String] = {

  • version.map {version=>Option(get(SPARK_HOME_VER(version))).orElse(throw ...)} as it is a map and it will not accept null values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding you correctly, that's where my comment below would fix it. I'll edit the comment below to clarify it.

Option(get(s"livy.server.spark-home-$version"))
.orElse(throw new IllegalArgumentException(
s"Spark version: $version is not supported"))
}
/** Return the location of the spark home directory */
def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))
def sparkHome(version: Option[String] = None): Option[String] = {
version.map {version => SPARK_HOME_VER(version)
Copy link
Contributor

@ajbozarth ajbozarth Apr 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second look it may be better to use a case here, assuming the SPARK_HOME_VER def I suggested above I recommend above something like:

version.match {
   case Some(version) => Option(get(SPARK_HOME_VER(version))).orElse(throw ...)
   case None => Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))
}

where the None case is just the old code. This would be easier to read overall.

}.getOrElse(Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME")))
}

/** Return the spark master Livy sessions should use. */
def sparkMaster(): String = get(LIVY_SPARK_MASTER)

/** Return the path to the spark-submit executable. */
def sparkSubmit(): String = {
sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
def sparkSubmit(version: Option[String] = None): String = {
sparkHome(version).map { _ + File.separator + "bin" + File.separator +
"spark-submit" }.get
}

/** Return the list of superusers. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.cloudera.livy.server.batch

import java.io.File
import java.lang.ProcessBuilder.Redirect

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
Expand Down Expand Up @@ -62,7 +63,7 @@ object BatchSession {
request.conf, request.jars, request.files, request.archives, request.pyFiles, livyConf))
require(request.file != null, "File is required.")

val builder = new SparkProcessBuilder(livyConf)
val builder = new SparkProcessBuilder(livyConf, request.sparkVersion)
builder.conf(conf)

proxyUser.foreach(builder.proxyUser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ class CreateBatchRequest {
var queue: Option[String] = None
var name: Option[String] = None
var conf: Map[String, String] = Map()
var sparkVersion: Option[String] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import scala.collection.mutable.ArrayBuffer
import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.util.LineBufferedProcess

class SparkProcessBuilder(livyConf: LivyConf) extends Logging {
class SparkProcessBuilder(livyConf: LivyConf, version: Option[String]) extends Logging {

private[this] var _executable: String = livyConf.sparkSubmit()
private[this] var _executable: String = livyConf.sparkSubmit(version)
private[this] var _master: Option[String] = None
private[this] var _deployMode: Option[String] = None
private[this] var _className: Option[String] = None
Expand Down