Skip to content

Commit

Permalink
Improve distributed load
Browse files Browse the repository at this point in the history
  • Loading branch information
elega committed Nov 2, 2023
1 parent ec4868d commit a16d1a5
Show file tree
Hide file tree
Showing 9 changed files with 401 additions and 190 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
/conf/metrics.properties
/conf/rocks-*.ini
/data/
/distributed_load/
/docs/_site/
/docs/api/
/docs/serve/
Expand Down
35 changes: 25 additions & 10 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -2311,7 +2311,11 @@ public String toString() {
public static final PropertyKey MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD =
intBuilder(Name.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD)
.setDefaultValue(-1)
.setDescription("The load job total load failure count threshold. -1 means never fail.")
.setDescription(
"The load job total load failure count threshold. -1 means never fail. "
+ "Note that we persist failed tasks in memory for retrying purpose and "
+ "on average one subtask takes up 0.5KB in memory. Properly set this property "
+ "to avoid OOM.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
Expand All @@ -2323,15 +2327,24 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_RETRIES =
intBuilder(Name.MASTER_DORA_LOAD_JOB_RETRIES)
.setDefaultValue(3)
.setDescription("The number of retry attempts before a load of file "
+ "is considered as failure")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
public static final PropertyKey MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR =
stringBuilder(Name.MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR)
.setDefaultValue(format("${%s}/distributed_load", Name.WORK_DIR))
.setDescription("The directory to store failed file list of a distributed load job.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_GET_LATEST_UFS_STATUS_ON_RETRY =
booleanBuilder(Name.MASTER_DORA_LOAD_JOB_GET_LATEST_UFS_STATUS_ON_RETRY)
.setDefaultValue(false)
.setDescription(
"If set true, distributed load will get the latest ufs status on retried job, "
+ "at the cost of slower file loading on retry. Turn this on if files will be "
+ "often updated during the loading process, or files are large."
)
.setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE)
.setScope(Scope.SERVER)
.build();

public static final PropertyKey MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE =
enumBuilder(Name.MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE, GraceMode.class)
.setDefaultValue(GraceMode.FORCED)
Expand Down Expand Up @@ -7552,8 +7565,10 @@ public static final class Name {
"alluxio.master.dora.load.job.total.failure.count.threshold";
public static final String MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD =
"alluxio.master.dora.load.job.total.failure.ratio.threshold";
public static final String MASTER_DORA_LOAD_JOB_RETRIES =
"alluxio.master.dora.load.job.retries";
public static final String MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR =
"alluxio.master.dora.load.job.failed.file.list.dir";
public static final String MASTER_DORA_LOAD_JOB_GET_LATEST_UFS_STATUS_ON_RETRY =
"alluxio.master.dora.load.job.get.latest.ufs.status.on.retry";
public static final String MASTER_DAILY_BACKUP_ENABLED =
"alluxio.master.daily.backup.enabled";
public static final String MASTER_DAILY_BACKUP_FILES_RETAINED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ public final class MultiDimensionalMetricsSystem {
.unit(Unit.BYTES)
.register();

// Distributed load related
public static final Counter DISTRIBUTED_LOAD_FAILURE = Counter.builder()
.name("alluxio_distributed_load_failure")
.help("counter of rpc calls of the meta operations")
.labelNames("reason", "final_attempt")
.register();

/**
* Initialize all the metrics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ public void onTaskSubmitFailure(Task<?> task) {
LOG.debug("OnTaskSubmitFailure, retry task size:{}", mRetryTaskList.size());
}

@Override
public void onWorkerUnavailable(T task) {
LOG.warn("Worker became unavailable: {}", task.getMyRunningWorker());
}

@Override
public boolean isRunning() {
return mState == JobState.RUNNING || mState == JobState.VERIFYING;
Expand Down
Loading

0 comments on commit a16d1a5

Please sign in to comment.