From 1ffc0fb5b86e68e9cb85194d9180f467df47da6d Mon Sep 17 00:00:00 2001 From: "445092967@qq.com" <445092967@qq.com> Date: Tue, 7 Nov 2023 16:18:16 +0800 Subject: [PATCH] Add a configurable DLQ capacity of avoid OOM --- .../file/AdaptivePrefetchCachePolicy.java | 2 ++ .../main/java/alluxio/conf/PropertyKey.java | 19 ++++++++++++++----- .../java/alluxio/master/job/DoraLoadJob.java | 7 ++++--- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/AdaptivePrefetchCachePolicy.java b/dora/core/client/fs/src/main/java/alluxio/client/file/AdaptivePrefetchCachePolicy.java index f0d3b101a51a..5d2c6e837cf2 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/AdaptivePrefetchCachePolicy.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/AdaptivePrefetchCachePolicy.java @@ -45,6 +45,8 @@ public void onCacheMissRead() { // previous position // halve the prefetch size to be conservative mPrefetchSize /= 2; + // To avoid the convergence of the prefetch size at 2 * read length - 1 + mPrefetchSize++; } @Override diff --git a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java index d8135da9e6f3..3785636a8fb8 100755 --- a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -2311,11 +2311,7 @@ public String toString() { public static final PropertyKey MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD = intBuilder(Name.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD) .setDefaultValue(-1) - .setDescription( - "The load job total load failure count threshold. -1 means never fail. " - + "Note that we persist failed tasks in memory for retrying purpose and " - + "on average one subtask takes up 0.5KB in memory. Properly set this property " - + "to avoid OOM.") + .setDescription("The load job total load failure count threshold. -1 means never fail. ") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); @@ -2327,6 +2323,17 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); + public static final PropertyKey MASTER_DORA_LOAD_JOB_RETRY_DLQ_CAPACITY = + intBuilder(Name.MASTER_DORA_LOAD_JOB_RETRY_DLQ_CAPACITY) + .setDefaultValue(1_000_000) + .setDescription( + "The capacity of the dead letter queue we persist failed tasks in memory " + + "for retrying purpose. Once the queue is full, failed subtasks will not retry" + + "and will just fail. On average one subtask takes up 0.5KB in memory. " + + " Properly set this property to avoid OOM.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.SERVER) + .build(); public static final PropertyKey MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR = stringBuilder(Name.MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR) .setDefaultValue(format("${%s}/job_results/load", Name.WORK_DIR)) @@ -7554,6 +7561,8 @@ public static final class Name { "alluxio.master.dora.load.job.total.failure.count.threshold"; public static final String MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD = "alluxio.master.dora.load.job.total.failure.ratio.threshold"; + public static final String MASTER_DORA_LOAD_JOB_RETRY_DLQ_CAPACITY = + "alluxio.master.dora.load.job.retry.dlq.capacity"; public static final String MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR = "alluxio.master.dora.load.job.failed.file.list.dir"; public static final String MASTER_DAILY_BACKUP_ENABLED = diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java b/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java index fbbe5d3f5f47..d237a365c1c7 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java @@ -128,6 +128,8 @@ public class DoraLoadJob extends AbstractJob { PropertyKey.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD); private static final int FAILURE_COUNT_THRESHOLD = Configuration.getInt( PropertyKey.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD); + private static final int RETRY_DLQ_CAPACITY = Configuration.getInt( + PropertyKey.MASTER_DORA_LOAD_JOB_RETRY_DLQ_CAPACITY); private final boolean mSkipIfExists; private final Optional mFileFilterRegx; @@ -404,7 +406,7 @@ public void addLoadedBytes(long bytes) { private void addSubTaskToRetryOrFail(LoadSubTask subTask, FailureReason reason, String message) { LOG.debug("Retry file {}", subTask); - if (subTask.isRetry() || !isHealthy()) { + if (subTask.isRetry() || !isHealthy() || mRetrySubTasksDLQ.size() >= RETRY_DLQ_CAPACITY) { addFileFailure(subTask, reason, message); return; } @@ -460,8 +462,7 @@ public int hashCode() { @Override public boolean isHealthy() { - // Tasks to retry are considered as "failed" tasks when calculating if the job is healthy. - long totalFailureCount = mTotalFinalFailureCount.get() + mRetrySubTasksDLQ.size(); + long totalFailureCount = mTotalFinalFailureCount.get(); if (FAILURE_RATIO_THRESHOLD >= 1.0 || FAILURE_COUNT_THRESHOLD < 0) { return true; }