Skip to content

Commit

Permalink
Add a configurable DLQ capacity of avoid OOM
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

1. Add a property to limit the DLQ capacity in distributed load
2. Fix a position reader bug where the prefetch size will converge at 2 * read length - 1

			pr-link: Alluxio#18387
			change-id: cid-2a20dd7c85de56926cd3a6bbc67c3ed6e8c14299
  • Loading branch information
elega authored and ssz1997 committed Dec 15, 2023
1 parent 066a267 commit b9bf227
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public void onCacheMissRead() {
// previous position
// halve the prefetch size to be conservative
mPrefetchSize /= 2;
// To avoid the convergence of the prefetch size at 2 * read length - 1
mPrefetchSize++;
}

@Override
Expand Down
19 changes: 14 additions & 5 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -2311,11 +2311,7 @@ public String toString() {
public static final PropertyKey MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD =
intBuilder(Name.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD)
.setDefaultValue(-1)
.setDescription(
"The load job total load failure count threshold. -1 means never fail. "
+ "Note that we persist failed tasks in memory for retrying purpose and "
+ "on average one subtask takes up 0.5KB in memory. Properly set this property "
+ "to avoid OOM.")
.setDescription("The load job total load failure count threshold. -1 means never fail. ")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
Expand All @@ -2327,6 +2323,17 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_RETRY_DLQ_CAPACITY =
intBuilder(Name.MASTER_DORA_LOAD_JOB_RETRY_DLQ_CAPACITY)
.setDefaultValue(1_000_000)
.setDescription(
"The capacity of the dead letter queue we persist failed tasks in memory "
+ "for retrying purpose. Once the queue is full, failed subtasks will not retry"
+ "and will just fail. On average one subtask takes up 0.5KB in memory. "
+ " Properly set this property to avoid OOM.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR =
stringBuilder(Name.MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR)
.setDefaultValue(format("${%s}/job_results/load", Name.WORK_DIR))
Expand Down Expand Up @@ -7555,6 +7562,8 @@ public static final class Name {
"alluxio.master.dora.load.job.total.failure.count.threshold";
public static final String MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD =
"alluxio.master.dora.load.job.total.failure.ratio.threshold";
public static final String MASTER_DORA_LOAD_JOB_RETRY_DLQ_CAPACITY =
"alluxio.master.dora.load.job.retry.dlq.capacity";
public static final String MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR =
"alluxio.master.dora.load.job.failed.file.list.dir";
public static final String MASTER_DAILY_BACKUP_ENABLED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public class DoraLoadJob extends AbstractJob<DoraLoadJob.DoraLoadTask> {
PropertyKey.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD);
private static final int FAILURE_COUNT_THRESHOLD = Configuration.getInt(
PropertyKey.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD);
private static final int RETRY_DLQ_CAPACITY = Configuration.getInt(
PropertyKey.MASTER_DORA_LOAD_JOB_RETRY_DLQ_CAPACITY);
private final boolean mSkipIfExists;

private final Optional<String> mFileFilterRegx;
Expand Down Expand Up @@ -404,7 +406,7 @@ public void addLoadedBytes(long bytes) {

private void addSubTaskToRetryOrFail(LoadSubTask subTask, FailureReason reason, String message) {
LOG.debug("Retry file {}", subTask);
if (subTask.isRetry() || !isHealthy()) {
if (subTask.isRetry() || !isHealthy() || mRetrySubTasksDLQ.size() >= RETRY_DLQ_CAPACITY) {
addFileFailure(subTask, reason, message);
return;
}
Expand Down Expand Up @@ -460,8 +462,7 @@ public int hashCode() {

@Override
public boolean isHealthy() {
// Tasks to retry are considered as "failed" tasks when calculating if the job is healthy.
long totalFailureCount = mTotalFinalFailureCount.get() + mRetrySubTasksDLQ.size();
long totalFailureCount = mTotalFinalFailureCount.get();
if (FAILURE_RATIO_THRESHOLD >= 1.0 || FAILURE_COUNT_THRESHOLD < 0) {
return true;
}
Expand Down

0 comments on commit b9bf227

Please sign in to comment.