Skip to content

Commit

Permalink
Merge pull request #34 from lyft/dzhi01
Browse files Browse the repository at this point in the history
Automatic staging committer conflict-mode for dynamic partition overwrite
  • Loading branch information
dzhi-lyft authored Apr 22, 2021
2 parents c879a5d + 4139868 commit 6efc63b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ class HadoopMapReduceCommitProtocol(
jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true)
jobContext.getConfiguration.setInt("mapreduce.task.partition", 0)

// Automatically set conflict-mode based on value of dynamicPartitionOverwrite,
// unless configuration auto-staging-conflict-mode exists with value false.
val autoConflictMode = jobContext.getConfiguration.get(
"spark.internal.io.hmrcp.auto-staging-conflict-mode")
if (autoConflictMode == null || autoConflictMode != "false") {
if (dynamicPartitionOverwrite) {
jobContext.getConfiguration.set("fs.s3a.committer.staging.conflict-mode", "replace")
} else {
jobContext.getConfiguration.set("fs.s3a.committer.staging.conflict-mode", "append")
}
}

val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
committer.setupJob(jobContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.internal.io.cloud

import java.io.IOException

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}
Expand Down Expand Up @@ -50,13 +48,15 @@ class PathOutputCommitProtocol(
jobId: String,
dest: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {
extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite) with Serializable {

if (dynamicPartitionOverwrite) {
// until there's explicit extensions to the PathOutputCommitProtocols
// to support the spark mechanism, it's left to the individual committer
// choice to handle partitioning.
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
// throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
// The above exception is disabled with automatic value of fs.s3a.committer.staging.conflict-mode
// in HadoopMapReduceCommitProtocol.
}

/** The committer created. */
Expand Down

0 comments on commit 6efc63b

Please sign in to comment.