Skip to content

Commit

Permalink
Merge branch 'master' into doc_new_api
Browse files Browse the repository at this point in the history
  • Loading branch information
xinrong-meng authored Feb 6, 2025
2 parents 3d61c35 + 5a925c6 commit cc2c4cb
Show file tree
Hide file tree
Showing 113 changed files with 3,034 additions and 1,516 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ jobs:
python-version: '3.11'
architecture: x64
- name: Install Python packages (Python 3.11)
if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect')
if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect') || contains(matrix.modules, 'yarn')
run: |
python3.11 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'lxml==4.9.4' 'grpcio==1.67.0' 'grpcio-status==1.67.0' 'protobuf==5.29.1'
python3.11 -m pip list
Expand Down
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ core/src/main/resources/org/apache/spark/ui/static/sorttable.js
docs/js/vendor/anchor.min.js
docs/js/vendor/jquery*
docs/js/vendor/modernizer*
docs/js/vendor/docsearch.min.js

ISC License
-----------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.annotation;

import java.lang.annotation.*;

/**
* This method is only available in the Classic implementation of the Scala/Java SQL API.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface ClassicOnly { }
23 changes: 18 additions & 5 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3923,6 +3923,12 @@
],
"sqlState" : "0A000"
},
"NEGATIVE_SCALE_DISALLOWED" : {
"message" : [
"Negative scale is not allowed: '<scale>'. Set the config <sqlConf> to \"true\" to allow it."
],
"sqlState" : "0A000"
},
"NEGATIVE_VALUES_IN_FREQUENCY_EXPRESSION" : {
"message" : [
"Found the negative value in <frequencyExpression>: <negativeValue>, but expected a positive integral value."
Expand Down Expand Up @@ -4936,12 +4942,24 @@
},
"sqlState" : "42KDE"
},
"STREAMING_PYTHON_RUNNER_INITIALIZATION_COMMUNICATION_FAILURE" : {
"message" : [
"Streaming Runner initialization failed during initial config communication. Cause: <msg>"
],
"sqlState" : "XXKST"
},
"STREAMING_PYTHON_RUNNER_INITIALIZATION_FAILURE" : {
"message" : [
"Streaming Runner initialization failed, returned <resFromPython>. Cause: <msg>"
],
"sqlState" : "XXKST"
},
"STREAMING_PYTHON_RUNNER_INITIALIZATION_TIMEOUT_FAILURE" : {
"message" : [
"Streaming Runner initialization failed. Socket connection timeout. Cause: <msg>"
],
"sqlState" : "XXKST"
},
"STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
"message" : [
"Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.",
Expand Down Expand Up @@ -5376,11 +5394,6 @@
"Resilient Distributed Datasets (RDDs)."
]
},
"REGISTER_UDAF" : {
"message" : [
"Registering User Defined Aggregate Functions (UDAFs)."
]
},
"SESSION_BASE_RELATION_TO_DATAFRAME" : {
"message" : [
"Invoking SparkSession 'baseRelationToDataFrame'. This is server side developer API"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ private[spark] class StreamingPythonRunner(
* to be used with the functions.
*/
def init(): (DataOutputStream, DataInputStream) = {
logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Sending necessary information to the " +
log"Python worker")
val env = SparkEnv.get

val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
Expand All @@ -76,47 +78,60 @@ private[spark] class StreamingPythonRunner(
pythonWorker = Some(worker)
pythonWorkerFactory = Some(workerFactory)

logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Python worker created")

val stream = new BufferedOutputStream(
pythonWorker.get.channel.socket().getOutputStream, bufferSize)
val socket = pythonWorker.get.channel.socket()
val stream = new BufferedOutputStream(socket.getOutputStream, bufferSize)
val dataIn = new DataInputStream(new BufferedInputStream(socket.getInputStream, bufferSize))
val dataOut = new DataOutputStream(stream)

logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Sending necessary information to the " +
log"Python worker")
val originalTimeout = socket.getSoTimeout()
// Set timeout to 5 minute during initialization config transmission
socket.setSoTimeout(5 * 60 * 1000)

PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
val resFromPython = try {
PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)

// Send sessionId
if (!sessionId.isEmpty) {
PythonRDD.writeUTF(sessionId, dataOut)
}
// Send sessionId
if (!sessionId.isEmpty) {
PythonRDD.writeUTF(sessionId, dataOut)
}

// Send the user function to python process
PythonWorkerUtils.writePythonFunction(func, dataOut)
dataOut.flush()
// Send the user function to python process
PythonWorkerUtils.writePythonFunction(func, dataOut)
dataOut.flush()

logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Reading initialization response from " +
log"Python runner.")
logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Reading initialization response from " +
log"Python runner.")
dataIn.readInt()
} catch {
case e: java.net.SocketTimeoutException =>
throw new StreamingPythonRunnerInitializationTimeoutException(e.getMessage)
case e: Exception =>
throw new StreamingPythonRunnerInitializationCommunicationException(e.getMessage)
}

val dataIn = new DataInputStream(
new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize))
// Set timeout back to the original timeout
// Should be infinity by default
socket.setSoTimeout(originalTimeout)

val resFromPython = dataIn.readInt()
if (resFromPython != 0) {
val errMessage = PythonWorkerUtils.readUTF(dataIn)
throw streamingPythonRunnerInitializationFailure(resFromPython, errMessage)
throw new StreamingPythonRunnerInitializationException(resFromPython, errMessage)
}
logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Runner initialization succeeded " +
log"(returned ${MDC(PYTHON_WORKER_RESPONSE, resFromPython)}).")

(dataOut, dataIn)
}

def streamingPythonRunnerInitializationFailure(resFromPython: Int, errMessage: String):
StreamingPythonRunnerInitializationException = {
new StreamingPythonRunnerInitializationException(resFromPython, errMessage)
}
class StreamingPythonRunnerInitializationCommunicationException(errMessage: String)
extends SparkPythonException(
errorClass = "STREAMING_PYTHON_RUNNER_INITIALIZATION_COMMUNICATION_FAILURE",
messageParameters = Map("msg" -> errMessage))

class StreamingPythonRunnerInitializationTimeoutException(errMessage: String)
extends SparkPythonException(
errorClass = "STREAMING_PYTHON_RUNNER_INITIALIZATION_TIMEOUT_FAILURE",
messageParameters = Map("msg" -> errMessage))

class StreamingPythonRunnerInitializationException(resFromPython: Int, errMessage: String)
extends SparkPythonException(
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy
import java.io.File
import java.net.URI
import java.nio.file.Files
import java.util.Locale

import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -52,9 +53,12 @@ object PythonRunner {
// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))
val apiMode = sparkConf.get(SPARK_API_MODE).toLowerCase(Locale.ROOT)
val isAPIModeClassic = apiMode == "classic"
val isAPIModeConnect = apiMode == "connect"

var gatewayServer: Option[Py4JServer] = None
if (sparkConf.getOption("spark.remote").isEmpty) {
if (sparkConf.getOption("spark.remote").isEmpty || isAPIModeClassic) {
gatewayServer = Some(new Py4JServer(sparkConf))

val thread = new Thread(() => Utils.logUncaughtExceptions { gatewayServer.get.start() })
Expand All @@ -80,7 +84,7 @@ object PythonRunner {
// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
val env = builder.environment()
if (sparkConf.getOption("spark.remote").nonEmpty) {
if (sparkConf.getOption("spark.remote").nonEmpty || isAPIModeConnect) {
// For non-local remote, pass configurations to environment variables so
// Spark Connect client sets them. For local remotes, they will be set
// via Py4J.
Expand All @@ -90,7 +94,11 @@ object PythonRunner {
env.put(s"PYSPARK_REMOTE_INIT_CONF_$idx", compact(render(group)))
}
}
sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url))
if (isAPIModeClassic) {
sparkConf.getOption("spark.master").foreach(url => env.put("MASTER", url))
} else {
sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url))
}
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2821,4 +2821,15 @@ package object config {
.version("4.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val SPARK_API_MODE =
ConfigBuilder("spark.api.mode")
.doc("For Spark Classic applications, specify whether to automatically use Spark Connect " +
"by running a local Spark Connect server dedicated to the application. The server is " +
"terminated when the application is terminated. The value can be `classic` or `connect`.")
.version("4.0.0")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("connect", "classic"))
.createWithDefault("classic")
}
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,7 @@ def __hash__(self):
"pyspark.ml.tests.connect.test_parity_clustering",
"pyspark.ml.tests.connect.test_parity_evaluation",
"pyspark.ml.tests.connect.test_parity_feature",
"pyspark.ml.tests.connect.test_parity_functions",
"pyspark.ml.tests.connect.test_parity_pipeline",
"pyspark.ml.tests.connect.test_parity_tuning",
"pyspark.ml.tests.connect.test_parity_ovr",
Expand Down
13 changes: 5 additions & 8 deletions docs/_layouts/global.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@
{% endif %}


<link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet"
integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">
<link rel="stylesheet" href="{{ rel_path_to_root }}css/bootstrap.min.css">
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet">
<link href="{{ rel_path_to_root }}css/custom.css" rel="stylesheet">
<script src="{{ rel_path_to_root}}js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>

<link rel="stylesheet" href="{{ rel_path_to_root }}css/pygments-default.css">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/docsearch.js@2/dist/cdn/docsearch.min.css" />
<link rel="stylesheet" href="{{ rel_path_to_root }}css/docsearch.min.css" />
<link rel="stylesheet" href="{{ rel_path_to_root }}css/docsearch.css">

{% production %}
Expand Down Expand Up @@ -193,15 +192,13 @@ <h1 class="title">{{ page.title }}</h1>
<!-- /container -->
</div>

<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.bundle.min.js"
integrity="sha384-MrcW6ZMFYlzcLA8Nl+NtUVF0sA7MsXsP1UyJoMp4YLEuNSfAP+JcXn/tWtIaxVXM"
crossorigin="anonymous"></script>
<script src="https://code.jquery.com/jquery.js"></script>
<script src="{{ rel_path_to_root }}js/vendor/jquery-3.5.1.min.js"></script>
<script src="{{ rel_path_to_root }}js/vendor/bootstrap.bundle.min.js"></script>

<script src="{{ rel_path_to_root }}js/vendor/anchor.min.js"></script>
<script src="{{ rel_path_to_root}}js/main.js"></script>

<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/docsearch.js@2/dist/cdn/docsearch.min.js"></script>
<script type="text/javascript" src="{{ rel_path_to_root }}js/vendor/docsearch.min.js"></script>
<script type="text/javascript">
// DocSearch is entirely free and automated. DocSearch is built in two parts:
// 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link
Expand Down
2 changes: 0 additions & 2 deletions docs/_plugins/build_api_docs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,9 @@ def build_scala_and_java_docs
# Copy over the unified ScalaDoc for all projects to api/scala.
# This directory will be copied over to _site when `jekyll` command is run.
copy_and_update_scala_docs("../target/scala-2.13/unidoc", "api/scala")
# copy_and_update_scala_docs("../sql/connect/client/jvm/target/scala-2.13/unidoc", "api/connect/scala")

# Copy over the unified JavaDoc for all projects to api/java.
copy_and_update_java_docs("../target/javaunidoc", "api/java", "api/scala")
# copy_and_update_java_docs("../sql/connect/client/jvm/target/javaunidoc", "api/connect/java", "api/connect/scala")
end

def build_python_docs
Expand Down
27 changes: 27 additions & 0 deletions docs/app-dev-spark-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,33 @@ to the client via the blue box as part of the Spark Connect API. The client uses
alongside PySpark or the Spark Scala client, making it easy for Spark client applications to work
with the custom logic/library.

## Spark API Mode: Spark Client and Spark Classic

Spark provides the API mode, `spark.api.mode` configuration, enabling Spark Classic applications
to seamlessly switch to Spark Connect. Depending on the value of `spark.api.mode`, the application
can run in either Spark Classic or Spark Connect mode. Here is an example:

{% highlight python %}
from pyspark.sql import SparkSession

SparkSession.builder.config("spark.api.mode", "connect").master("...").getOrCreate()
{% endhighlight %}

You can also apply this configuration to both Scala and PySpark applications when submitting yours:

{% highlight bash %}
spark-submit --master "..." --conf spark.api.mode=connect
{% endhighlight %}

Additionally, Spark Connect offers convenient options for local testing. By setting `spark.remote`
to `local[...]` or `local-cluster[...]`, you can start a local Spark Connect server and access a Spark
Connect session.

This is similar to using `--conf spark.api.mode=connect` with `--master ...`. However, note that
`spark.remote` and `--remote` are limited to `local*` values, while `--conf spark.api.mode=connect`
with `--master ...` supports additional cluster URLs, such as spark://, for broader compatibility with
Spark Classic.

## Spark Client Applications

Spark Client Applications are the _regular Spark applications_ that Spark users develop today, e.g.,
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3390,6 +3390,14 @@ They are typically set via the config file and command-line options with `--conf

<table class="spark-config">
<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead>
<tr>
<td><code>spark.api.mode</code></td>
<td>
classic
</td>
<td>For Spark Classic applications, specify whether to automatically use Spark Connect by running a local Spark Connect server. The value can be <code>classic</code> or <code>connect</code>.</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.connect.grpc.binding.port</code></td>
<td>
Expand Down
2 changes: 2 additions & 0 deletions docs/css/docsearch.min.css

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ options for deployment:

* [Spark Python API (Sphinx)](api/python/index.html)
* [Spark Scala API (Scaladoc)](api/scala/org/apache/spark/index.html)
* [Spark Connect Scala Client API (Scaladoc)](api/connect/scala/org/apache/spark/index.html)
* [Spark Java API (Javadoc)](api/java/index.html)
* [Spark R API (Roxygen2)](api/R/index.html)
* [Spark SQL, Built-in Functions (MkDocs)](api/sql/index.html)
Expand Down
2 changes: 2 additions & 0 deletions docs/js/vendor/docsearch.min.js

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
public static final String SPARK_REMOTE = "spark.remote";
public static final String SPARK_LOCAL_REMOTE = "spark.local.connect";

/** The Spark API mode. */
public static final String SPARK_API_MODE = "spark.api.mode";

/** The Spark deploy mode. */
public static final String DEPLOY_MODE = "spark.submit.deployMode";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IO
if (remoteStr != null) {
env.put("SPARK_REMOTE", remoteStr);
env.put("SPARK_CONNECT_MODE_ENABLED", "1");
} else if (conf.getOrDefault(
SparkLauncher.SPARK_API_MODE, "classic").toLowerCase(Locale.ROOT).equals("connect") &&
masterStr != null) {
env.put("SPARK_REMOTE", masterStr);
env.put("SPARK_CONNECT_MODE_ENABLED", "1");
}

if (!isEmpty(pyOpts)) {
Expand Down Expand Up @@ -523,7 +528,9 @@ protected boolean handle(String opt, String value) {
checkArgument(value != null, "Missing argument to %s", CONF);
String[] setConf = value.split("=", 2);
checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
if (setConf[0].equals("spark.remote")) {
if (setConf[0].equals("spark.remote") ||
(setConf[0].equals(SparkLauncher.SPARK_API_MODE) &&
setConf[1].toLowerCase(Locale.ROOT).equals("connect"))) {
isRemote = true;
}
conf.put(setConf[0], setConf[1]);
Expand Down
Loading

0 comments on commit cc2c4cb

Please sign in to comment.