Skip to content

Commit

Permalink
[SPARK-50819] Refactor Spark profiler module
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR refactors the `ExecutorProfilerPlugin` to prepare to support driver profiling, it

- re-organize the classes from `o.a.s.executor.profiler` to `o.a.s.profiler` package, with some class renaming.
- re-organize the configuration namespace from `spark.executor.profiling.` to `spark.profiler.`, and tune some configuration keys.

### Why are the changes needed?

The profiler could be more generic to
- extend the scope to support driver profiling, see #49483
- maybe support other profilers like JDK built-in JFR

### Does this PR introduce _any_ user-facing change?

No, it's an unreleased feature.

### How was this patch tested?

GHA verifies code compilation, and I tested it on a YARN cluster
```
bin/spark-submit run-example \
   --deploy-mode client \
   --conf spark.plugins=org.apache.spark.profiler.ProfilerPlugin \
   --conf spark.profiler.executor.enabled=true \
   --conf spark.profiler.executor.fraction=1 \
   --conf spark.profiler.dfsDir=hdfs:///spark-profiling \
   SparkPi 100000
```
```
$ hadoop fs -ls /spark-profiling/application_1736320707252_0029
Found 48 items
-rw-rw----   3 hadoop supergroup    4944251 2025-01-15 11:21 /spark-profiling/application_1736320707252_0029/profile-exec-1.jfr
-rw-rw----   3 hadoop supergroup    3527597 2025-01-15 11:21 /spark-profiling/application_1736320707252_0029/profile-exec-10.jfr
-rw-rw----   3 hadoop supergroup    3352900 2025-01-15 11:21 /spark-profiling/application_1736320707252_0029/profile-exec-11.jfr
-rw-rw----   3 hadoop supergroup    3464907 2025-01-15 11:21 /spark-profiling/application_1736320707252_0029/profile-exec-12.jfr
...
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49492 from pan3793/SPARK-50819.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
pan3793 authored and dongjoon-hyun committed Jan 15, 2025
1 parent 9841ae2 commit 4bd37aa
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 76 deletions.
46 changes: 23 additions & 23 deletions connector/profiler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ For more information on async_profiler see the [Async Profiler Manual](https://k
To enable code profiling, first enable the code profiling plugin via

```
spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin
spark.plugins=org.apache.spark.profiler.ProfilerPlugin
```

Then enable the profiling in the configuration.
Expand All @@ -50,15 +50,23 @@ Then enable the profiling in the configuration.
<table class="spark-config">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.executor.profiling.enabled</code></td>
<td><code>spark.profiler.executor.enabled</code></td>
<td><code>false</code></td>
<td>
If true, will enable code profiling
If true, turn on profiling in executors.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.dfsDir</code></td>
<td><code>spark.profiler.executor.fraction</code></td>
<td>0.10</td>
<td>
The fraction of executors on which to enable profiling. The executors to be profiled are picked at random.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.profiler.dfsDir</code></td>
<td>(none)</td>
<td>
An HDFS compatible path to which the profiler's output files are copied. The output files will be written as <i>dfsDir/{{APP_ID}}/profile-exec-{{EXECUTOR_ID}}.jfr</i> <br/>
Expand All @@ -67,36 +75,28 @@ Then enable the profiling in the configuration.
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.localDir</code></td>
<td><code>spark.profiler.localDir</code></td>
<td><code>.</code> i.e. the executor's working dir</td>
<td>
The local directory in the executor container to write the jfr files to. If not specified the file will be written to the executor's working directory. Users should ensure there is sufficient disk space available on the system as running out of space may result in corrupt jfr file and even cause jobs to fail on systems like K8s.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.options</code></td>
<td><code>spark.profiler.asyncProfiler.args</code></td>
<td>event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s</td>
<td>
Options to pass to the profiler. Detailed options are documented in the comments here:
Arguments to pass to the Async Profiler. Detailed options are documented in the comments here:
<a href="https://github.com/async-profiler/async-profiler/blob/v3.0/src/arguments.cpp#L44">Profiler arguments</a>.
Note that the options to start, stop, specify output format, and output file do not have to be specified.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.fraction</code></td>
<td>0.10</td>
<td>
The fraction of executors on which to enable code profiling. The executors to be profiled are picked at random.
Note that the arguments to start, stop, specify output format, and output file do not have to be specified.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.writeInterval</code></td>
<td><code>spark.profiler.dfsWriteInterval</code></td>
<td>30</td>
<td>
Time interval, in seconds, after which the profiler output will be synced to dfs.
Time interval, in seconds, after which the profiler output will be synced to DFS.
</td>
<td>4.0.0</td>
</tr>
Expand All @@ -115,11 +115,11 @@ On Kubernetes, spark will try to shut down the executor pods while the profiler
--master <master-url> \
--deploy-mode <deploy-mode> \
-c spark.executor.extraJavaOptions="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer" \
-c spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin \
-c spark.executor.profiling.enabled=true \
-c spark.executor.profiling.dfsDir=s3a://my-bucket/spark/profiles/ \
-c spark.executor.profiling.options=event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s \
-c spark.executor.profiling.fraction=0.10 \
-c spark.plugins=org.apache.spark.profiler.ProfilerPlugin \
-c spark.profiler.executor.enabled=true \
-c spark.profiler.executor.fraction=0.10 \
-c spark.profiler.dfsDir=s3a://my-bucket/spark/profiles/ \
-c spark.profiler.asyncProfiler.args=event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s \
-c spark.kubernetes.executor.deleteOnTermination=false \
<application-jar> \
[application-arguments]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor.profiler
package org.apache.spark.profiler

import java.util.{Map => JMap}

Expand All @@ -28,42 +28,41 @@ import org.apache.spark.internal.LogKeys.EXECUTOR_ID


/**
* Spark plugin to do JVM code profiling of executors
* Spark plugin to do profiling
*/
class ExecutorProfilerPlugin extends SparkPlugin {
class ProfilerPlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = null

// No-op
override def executorPlugin(): ExecutorPlugin = new JVMProfilerExecutorPlugin
override def executorPlugin(): ExecutorPlugin = new ProfilerExecutorPlugin
}

class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging {
class ProfilerExecutorPlugin extends ExecutorPlugin with Logging {

private var sparkConf: SparkConf = _
private var pluginCtx: PluginContext = _
private var profiler: ExecutorJVMProfiler = _
private var codeProfilingEnabled: Boolean = _
private var codeProfilingFraction: Double = _
private var profiler: SparkAsyncProfiler = _
private var executorProfilerEnabled: Boolean = _
private var executorProfilerFraction: Double = _
private val rand: Random = new Random(System.currentTimeMillis())

override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
pluginCtx = ctx
sparkConf = ctx.conf()
codeProfilingEnabled = sparkConf.get(EXECUTOR_PROFILING_ENABLED)
if (codeProfilingEnabled) {
codeProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION)
if (rand.nextInt(100) * 0.01 < codeProfilingFraction) {
executorProfilerEnabled = sparkConf.get(PROFILER_EXECUTOR_ENABLED)
if (executorProfilerEnabled) {
executorProfilerFraction = sparkConf.get(PROFILER_EXECUTOR_FRACTION)
if (rand.nextInt(100) * 0.01 < executorProfilerFraction) {
logInfo(log"Executor id ${MDC(EXECUTOR_ID, pluginCtx.executorID())} " +
log"selected for JVM code profiling")
profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID())
log"selected for profiling")
profiler = new SparkAsyncProfiler(sparkConf, pluginCtx.executorID())
profiler.start()
}
}
Map.empty[String, String].asJava
}

override def shutdown(): Unit = {
logInfo("Executor JVM profiler shutting down")
logInfo("Executor profiler shutting down")
if (profiler != null) {
profiler.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor.profiler
package org.apache.spark.profiler

import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException}
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
Expand All @@ -31,16 +31,16 @@ import org.apache.spark.util.{ThreadUtils, Utils}


/**
* A class that enables the async JVM code profiler
* A class that wraps AsyncProfiler
*/
private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) extends Logging {
private[spark] class SparkAsyncProfiler(conf: SparkConf, executorId: String) extends Logging {

private var running = false
private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED)
private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS)
private val profilerDfsDirOpt = conf.get(EXECUTOR_PROFILING_DFS_DIR)
private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR)
private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL)
private val enableProfiler = conf.get(PROFILER_EXECUTOR_ENABLED)
private val profilerOptions = conf.get(PROFILER_ASYNC_PROFILER_OPTIONS)
private val profilerDfsDirOpt = conf.get(PROFILER_DFS_DIR)
private val profilerLocalDir = conf.get(PROFILER_LOCAL_DIR)
private val writeInterval = conf.get(PROFILER_DFS_WRITE_INTERVAL)

private val appId = try {
conf.getAppId
Expand Down Expand Up @@ -76,14 +76,14 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
try {
profiler.foreach(p => {
p.execute(startcmd)
logInfo("Executor JVM profiling started.")
logInfo("Profiling started.")
running = true
startWriting()
})
} catch {
case e @ (_: IllegalArgumentException | _: IllegalStateException | _: IOException) =>
logError("JVM profiling aborted. Exception occurred in profiler native code: ", e)
case e: Exception => logWarning("Executor JVM profiling aborted due to exception: ", e)
logError("Profiling aborted. Exception occurred in async-profiler native code: ", e)
case e: Exception => logWarning("Profiling aborted due to exception: ", e)
}
}
}
Expand All @@ -93,7 +93,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
if (running) {
profiler.foreach(p => {
p.execute(stopcmd)
logInfo("JVM profiler stopped")
logInfo("Profiler stopped")
running = false
finishWriting()
})
Expand Down Expand Up @@ -125,7 +125,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex

outputStream = FileSystem.create(fs, new Path(profileOutputFile), PROFILER_FILE_PERMISSIONS)
try {
logInfo(log"Copying executor profiling file to ${MDC(PATH, profileOutputFile)}")
logInfo(log"Copying profiling file to ${MDC(PATH, profileOutputFile)}")
inputStream = new BufferedInputStream(
new FileInputStream(s"$profilerLocalDir/$profileFile"))
threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread")
Expand All @@ -139,7 +139,7 @@ private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) ex
writing = true
} catch {
case e: Exception =>
logError("Failed to start JVM profiler", e)
logError("Failed to start profiler", e)
if (threadpool != null) {
threadpool.shutdownNow()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,55 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor
package org.apache.spark

import java.util.concurrent.TimeUnit

import org.apache.spark.internal.config.ConfigBuilder

package object profiler {

private[profiler] val EXECUTOR_PROFILING_ENABLED =
ConfigBuilder("spark.executor.profiling.enabled")
.doc("Turn on code profiling via async_profiler in executors.")
private[profiler] val PROFILER_EXECUTOR_ENABLED =
ConfigBuilder("spark.profiler.executor.enabled")
.doc("Turn on profiling in executors.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[profiler] val EXECUTOR_PROFILING_DFS_DIR =
ConfigBuilder("spark.executor.profiling.dfsDir")
private[profiler] val PROFILER_EXECUTOR_FRACTION =
ConfigBuilder("spark.profiler.executor.fraction")
.doc("Fraction of executors to profile")
.version("4.0.0")
.doubleConf
.checkValue(v => v >= 0.0 && v <= 1.0,
"Fraction of executors to profile must be in [0,1]")
.createWithDefault(0.1)

private[profiler] val PROFILER_DFS_DIR =
ConfigBuilder("spark.profiler.dfsDir")
.doc("HDFS compatible file-system path to where the profiler will write output jfr files.")
.version("4.0.0")
.stringConf
.createOptional

private[profiler] val EXECUTOR_PROFILING_LOCAL_DIR =
ConfigBuilder("spark.executor.profiling.localDir")
private[profiler] val PROFILER_LOCAL_DIR =
ConfigBuilder("spark.profiler.localDir")
.doc("Local file system path on executor where profiler output is saved. Defaults to the " +
"working directory of the executor process.")
.version("4.0.0")
.stringConf
.createWithDefault(".")

private[profiler] val EXECUTOR_PROFILING_OPTIONS =
ConfigBuilder("spark.executor.profiling.options")
.doc("Options to pass on to the async profiler.")
private[profiler] val PROFILER_ASYNC_PROFILER_OPTIONS =
ConfigBuilder("spark.profiler.asyncProfiler.args")
.doc("Arguments to pass on to the Async Profiler.")
.version("4.0.0")
.stringConf
.createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s")

private[profiler] val EXECUTOR_PROFILING_FRACTION =
ConfigBuilder("spark.executor.profiling.fraction")
.doc("Fraction of executors to profile")
.version("4.0.0")
.doubleConf
.checkValue(v => v >= 0.0 && v <= 1.0,
"Fraction of executors to profile must be in [0,1]")
.createWithDefault(0.1)

private[profiler] val EXECUTOR_PROFILING_WRITE_INTERVAL =
ConfigBuilder("spark.executor.profiling.writeInterval")
.doc("Time interval in seconds after which the profiler output will be synced to dfs")
private[profiler] val PROFILER_DFS_WRITE_INTERVAL =
ConfigBuilder("spark.profiler.dfsWriteInterval")
.doc("Time interval in seconds after which the profiler output will be synced to DFS.")
.version("4.0.0")
.timeConf(TimeUnit.SECONDS)
.checkValue(_ >= 0, "Write interval should be non-negative")
Expand Down

0 comments on commit 4bd37aa

Please sign in to comment.