diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 0ff7b3cd9f892..6d39ae900927d 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -358,7 +358,7 @@ jobs: python-version: '3.11' architecture: x64 - name: Install Python packages (Python 3.11) - if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect') + if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect') || contains(matrix.modules, 'yarn') run: | python3.11 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'lxml==4.9.4' 'grpcio==1.67.0' 'grpcio-status==1.67.0' 'protobuf==5.29.1' python3.11 -m pip list diff --git a/LICENSE b/LICENSE index 97f41c4474340..3f987cc7cdbac 100644 --- a/LICENSE +++ b/LICENSE @@ -247,6 +247,7 @@ core/src/main/resources/org/apache/spark/ui/static/sorttable.js docs/js/vendor/anchor.min.js docs/js/vendor/jquery* docs/js/vendor/modernizer* +docs/js/vendor/docsearch.min.js ISC License ----------- diff --git a/common/tags/src/main/java/org/apache/spark/annotation/ClassicOnly.java b/common/tags/src/main/java/org/apache/spark/annotation/ClassicOnly.java new file mode 100644 index 0000000000000..7a4818d964611 --- /dev/null +++ b/common/tags/src/main/java/org/apache/spark/annotation/ClassicOnly.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.annotation; + +import java.lang.annotation.*; + +/** + * This method is only available in the Classic implementation of the Scala/Java SQL API. + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface ClassicOnly { } diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index dc45583726833..ae84f566554dd 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3923,6 +3923,12 @@ ], "sqlState" : "0A000" }, + "NEGATIVE_SCALE_DISALLOWED" : { + "message" : [ + "Negative scale is not allowed: ''. Set the config to \"true\" to allow it." + ], + "sqlState" : "0A000" + }, "NEGATIVE_VALUES_IN_FREQUENCY_EXPRESSION" : { "message" : [ "Found the negative value in : , but expected a positive integral value." @@ -4936,12 +4942,24 @@ }, "sqlState" : "42KDE" }, + "STREAMING_PYTHON_RUNNER_INITIALIZATION_COMMUNICATION_FAILURE" : { + "message" : [ + "Streaming Runner initialization failed during initial config communication. Cause: " + ], + "sqlState" : "XXKST" + }, "STREAMING_PYTHON_RUNNER_INITIALIZATION_FAILURE" : { "message" : [ "Streaming Runner initialization failed, returned . Cause: " ], "sqlState" : "XXKST" }, + "STREAMING_PYTHON_RUNNER_INITIALIZATION_TIMEOUT_FAILURE" : { + "message" : [ + "Streaming Runner initialization failed. Socket connection timeout. Cause: " + ], + "sqlState" : "XXKST" + }, "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : { "message" : [ "Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.", @@ -5376,11 +5394,6 @@ "Resilient Distributed Datasets (RDDs)." ] }, - "REGISTER_UDAF" : { - "message" : [ - "Registering User Defined Aggregate Functions (UDAFs)." - ] - }, "SESSION_BASE_RELATION_TO_DATAFRAME" : { "message" : [ "Invoking SparkSession 'baseRelationToDataFrame'. This is server side developer API" diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index 58ca979cc57b6..6f9708def2f2b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -59,6 +59,8 @@ private[spark] class StreamingPythonRunner( * to be used with the functions. */ def init(): (DataOutputStream, DataInputStream) = { + logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Sending necessary information to the " + + log"Python worker") val env = SparkEnv.get val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") @@ -76,36 +78,44 @@ private[spark] class StreamingPythonRunner( pythonWorker = Some(worker) pythonWorkerFactory = Some(workerFactory) - logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Python worker created") - - val stream = new BufferedOutputStream( - pythonWorker.get.channel.socket().getOutputStream, bufferSize) + val socket = pythonWorker.get.channel.socket() + val stream = new BufferedOutputStream(socket.getOutputStream, bufferSize) + val dataIn = new DataInputStream(new BufferedInputStream(socket.getInputStream, bufferSize)) val dataOut = new DataOutputStream(stream) - logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Sending necessary information to the " + - log"Python worker") + val originalTimeout = socket.getSoTimeout() + // Set timeout to 5 minute during initialization config transmission + socket.setSoTimeout(5 * 60 * 1000) - PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + val resFromPython = try { + PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) - // Send sessionId - if (!sessionId.isEmpty) { - PythonRDD.writeUTF(sessionId, dataOut) - } + // Send sessionId + if (!sessionId.isEmpty) { + PythonRDD.writeUTF(sessionId, dataOut) + } - // Send the user function to python process - PythonWorkerUtils.writePythonFunction(func, dataOut) - dataOut.flush() + // Send the user function to python process + PythonWorkerUtils.writePythonFunction(func, dataOut) + dataOut.flush() - logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Reading initialization response from " + - log"Python runner.") + logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Reading initialization response from " + + log"Python runner.") + dataIn.readInt() + } catch { + case e: java.net.SocketTimeoutException => + throw new StreamingPythonRunnerInitializationTimeoutException(e.getMessage) + case e: Exception => + throw new StreamingPythonRunnerInitializationCommunicationException(e.getMessage) + } - val dataIn = new DataInputStream( - new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) + // Set timeout back to the original timeout + // Should be infinity by default + socket.setSoTimeout(originalTimeout) - val resFromPython = dataIn.readInt() if (resFromPython != 0) { val errMessage = PythonWorkerUtils.readUTF(dataIn) - throw streamingPythonRunnerInitializationFailure(resFromPython, errMessage) + throw new StreamingPythonRunnerInitializationException(resFromPython, errMessage) } logInfo(log"[session: ${MDC(SESSION_ID, sessionId)}] Runner initialization succeeded " + log"(returned ${MDC(PYTHON_WORKER_RESPONSE, resFromPython)}).") @@ -113,10 +123,15 @@ private[spark] class StreamingPythonRunner( (dataOut, dataIn) } - def streamingPythonRunnerInitializationFailure(resFromPython: Int, errMessage: String): - StreamingPythonRunnerInitializationException = { - new StreamingPythonRunnerInitializationException(resFromPython, errMessage) - } + class StreamingPythonRunnerInitializationCommunicationException(errMessage: String) + extends SparkPythonException( + errorClass = "STREAMING_PYTHON_RUNNER_INITIALIZATION_COMMUNICATION_FAILURE", + messageParameters = Map("msg" -> errMessage)) + + class StreamingPythonRunnerInitializationTimeoutException(errMessage: String) + extends SparkPythonException( + errorClass = "STREAMING_PYTHON_RUNNER_INITIALIZATION_TIMEOUT_FAILURE", + messageParameters = Map("msg" -> errMessage)) class StreamingPythonRunnerInitializationException(resFromPython: Int, errMessage: String) extends SparkPythonException( diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index e9507fa6bee48..87cef4d242ab5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy import java.io.File import java.net.URI import java.nio.file.Files +import java.util.Locale import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ @@ -52,9 +53,12 @@ object PythonRunner { // Format python file paths before adding them to the PYTHONPATH val formattedPythonFile = formatPath(pythonFile) val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles)) + val apiMode = sparkConf.get(SPARK_API_MODE).toLowerCase(Locale.ROOT) + val isAPIModeClassic = apiMode == "classic" + val isAPIModeConnect = apiMode == "connect" var gatewayServer: Option[Py4JServer] = None - if (sparkConf.getOption("spark.remote").isEmpty) { + if (sparkConf.getOption("spark.remote").isEmpty || isAPIModeClassic) { gatewayServer = Some(new Py4JServer(sparkConf)) val thread = new Thread(() => Utils.logUncaughtExceptions { gatewayServer.get.start() }) @@ -80,7 +84,7 @@ object PythonRunner { // Launch Python process val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava) val env = builder.environment() - if (sparkConf.getOption("spark.remote").nonEmpty) { + if (sparkConf.getOption("spark.remote").nonEmpty || isAPIModeConnect) { // For non-local remote, pass configurations to environment variables so // Spark Connect client sets them. For local remotes, they will be set // via Py4J. @@ -90,7 +94,11 @@ object PythonRunner { env.put(s"PYSPARK_REMOTE_INIT_CONF_$idx", compact(render(group))) } } - sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url)) + if (isAPIModeClassic) { + sparkConf.getOption("spark.master").foreach(url => env.put("MASTER", url)) + } else { + sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url)) + } env.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5dda7afc3ebcb..fc3f22abd9d75 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2821,4 +2821,15 @@ package object config { .version("4.0.0") .timeConf(TimeUnit.MILLISECONDS) .createOptional + + private[spark] val SPARK_API_MODE = + ConfigBuilder("spark.api.mode") + .doc("For Spark Classic applications, specify whether to automatically use Spark Connect " + + "by running a local Spark Connect server dedicated to the application. The server is " + + "terminated when the application is terminated. The value can be `classic` or `connect`.") + .version("4.0.0") + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .checkValues(Set("connect", "classic")) + .createWithDefault("classic") } diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index c9d1a89c60dc9..212cd1e10e8a2 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -1127,6 +1127,7 @@ def __hash__(self): "pyspark.ml.tests.connect.test_parity_clustering", "pyspark.ml.tests.connect.test_parity_evaluation", "pyspark.ml.tests.connect.test_parity_feature", + "pyspark.ml.tests.connect.test_parity_functions", "pyspark.ml.tests.connect.test_parity_pipeline", "pyspark.ml.tests.connect.test_parity_tuning", "pyspark.ml.tests.connect.test_parity_ovr", diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index f5a20dd441b0e..4d6ddfc2c74a1 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -22,8 +22,7 @@ {% endif %} - + @@ -31,7 +30,7 @@ - + {% production %} @@ -193,15 +192,13 @@

{{ page.title }}

- - + + - +