diff --git a/.gitignore b/.gitignore index ab8a80bdf405..c79c49f1ac82 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ /conf/metrics.properties /conf/rocks-*.ini /data/ +/distributed_load/ /docs/_site/ /docs/api/ /docs/serve/ diff --git a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java index ddc8d8bca487..548185291c51 100755 --- a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -2311,7 +2311,11 @@ public String toString() { public static final PropertyKey MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD = intBuilder(Name.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD) .setDefaultValue(-1) - .setDescription("The load job total load failure count threshold. -1 means never fail.") + .setDescription( + "The load job total load failure count threshold. -1 means never fail. " + + "Note that we persist failed tasks in memory for retrying purpose and " + + "on average one subtask takes up 0.5KB in memory. Properly set this property " + + "to avoid OOM.") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); @@ -2323,15 +2327,24 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.SERVER) .build(); - public static final PropertyKey MASTER_DORA_LOAD_JOB_RETRIES = - intBuilder(Name.MASTER_DORA_LOAD_JOB_RETRIES) - .setDefaultValue(3) - .setDescription("The number of retry attempts before a load of file " - + "is considered as failure") - .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + public static final PropertyKey MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR = + stringBuilder(Name.MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR) + .setDefaultValue(format("${%s}/distributed_load", Name.WORK_DIR)) + .setDescription("The directory to store failed file list of a distributed load job.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE) + .setScope(Scope.SERVER) + .build(); + public static final PropertyKey MASTER_DORA_LOAD_JOB_GET_LATEST_UFS_STATUS_ON_RETRY = + booleanBuilder(Name.MASTER_DORA_LOAD_JOB_GET_LATEST_UFS_STATUS_ON_RETRY) + .setDefaultValue(false) + .setDescription( + "If set true, distributed load will get the latest ufs status on retried job, " + + "at the cost of slower file loading on retry. Turn this on if files will be " + + "often updated during the loading process, or files are large." + ) + .setConsistencyCheckLevel(ConsistencyCheckLevel.IGNORE) .setScope(Scope.SERVER) .build(); - public static final PropertyKey MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE = enumBuilder(Name.MASTER_SHELL_BACKUP_STATE_LOCK_GRACE_MODE, GraceMode.class) .setDefaultValue(GraceMode.FORCED) @@ -7552,8 +7565,10 @@ public static final class Name { "alluxio.master.dora.load.job.total.failure.count.threshold"; public static final String MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD = "alluxio.master.dora.load.job.total.failure.ratio.threshold"; - public static final String MASTER_DORA_LOAD_JOB_RETRIES = - "alluxio.master.dora.load.job.retries"; + public static final String MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR = + "alluxio.master.dora.load.job.failed.file.list.dir"; + public static final String MASTER_DORA_LOAD_JOB_GET_LATEST_UFS_STATUS_ON_RETRY = + "alluxio.master.dora.load.job.get.latest.ufs.status.on.retry"; public static final String MASTER_DAILY_BACKUP_ENABLED = "alluxio.master.daily.backup.enabled"; public static final String MASTER_DAILY_BACKUP_FILES_RETAINED = diff --git a/dora/core/common/src/main/java/alluxio/metrics/MultiDimensionalMetricsSystem.java b/dora/core/common/src/main/java/alluxio/metrics/MultiDimensionalMetricsSystem.java index 109f713b0c6b..a4cb3e07e521 100644 --- a/dora/core/common/src/main/java/alluxio/metrics/MultiDimensionalMetricsSystem.java +++ b/dora/core/common/src/main/java/alluxio/metrics/MultiDimensionalMetricsSystem.java @@ -98,6 +98,13 @@ public final class MultiDimensionalMetricsSystem { .unit(Unit.BYTES) .register(); + // Distributed load related + public static final Counter DISTRIBUTED_LOAD_FAILURE = Counter.builder() + .name("alluxio_distributed_load_failure") + .help("counter of rpc calls of the meta operations") + .labelNames("reason", "final_attempt") + .register(); + /** * Initialize all the metrics. */ diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/AbstractJob.java b/dora/core/server/master/src/main/java/alluxio/master/job/AbstractJob.java index 502d4b8198c6..7e9efcd0de7a 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/AbstractJob.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/AbstractJob.java @@ -153,6 +153,11 @@ public void onTaskSubmitFailure(Task task) { LOG.debug("OnTaskSubmitFailure, retry task size:{}", mRetryTaskList.size()); } + @Override + public void onWorkerUnavailable(T task) { + LOG.warn("Worker became unavailable: {}", task.getMyRunningWorker()); + } + @Override public boolean isRunning() { return mState == JobState.RUNNING || mState == JobState.VERIFYING; diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java b/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java index 90b25975166f..9a052edc4309 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java @@ -15,6 +15,8 @@ import static java.util.Objects.requireNonNull; import alluxio.client.block.stream.BlockWorkerClient; +import alluxio.collections.ConcurrentHashSet; +import alluxio.collections.Pair; import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; import alluxio.exception.runtime.AlluxioRuntimeException; @@ -30,11 +32,13 @@ import alluxio.job.JobDescription; import alluxio.metrics.MetricKey; import alluxio.metrics.MetricsSystem; +import alluxio.metrics.MultiDimensionalMetricsSystem; import alluxio.proto.journal.Journal; import alluxio.scheduler.job.JobState; import alluxio.scheduler.job.Task; import alluxio.underfs.UfsStatus; import alluxio.underfs.UnderFileSystem; +import alluxio.util.CommonUtils; import alluxio.util.FormatUtils; import alluxio.wire.WorkerInfo; @@ -49,14 +53,22 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.collect.EvictingQueue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Queues; import com.google.common.util.concurrent.ListenableFuture; -import io.grpc.Status; +import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedWriter; +import java.io.File; import java.io.FileNotFoundException; +import java.io.FileWriter; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -69,11 +81,10 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; /** @@ -85,47 +96,129 @@ public class DoraLoadJob extends AbstractJob { private static final Logger LOG = LoggerFactory.getLogger(DoraLoadJob.class); public static final String TYPE = "load"; - private static final int RETRY_BLOCK_CAPACITY = 1000; - private static final double RETRY_THRESHOLD = 0.8 * RETRY_BLOCK_CAPACITY; private static final int BATCH_SIZE = Configuration.getInt(PropertyKey.JOB_BATCH_SIZE); // Job configurations private final String mLoadPath; - private OptionalLong mBandwidth; - private boolean mUsePartialListing; - private boolean mVerificationEnabled; + private final OptionalLong mBandwidth; + private final boolean mUsePartialListing; + private final boolean mVerificationEnabled; // Job states - private final Queue mRetrySubTasks = new ArrayDeque<>(); - private final Map mRetryCount = new ConcurrentHashMap<>(); - private final Map mFailedFiles = new HashMap<>(); + private final Queue mRetrySubTasksDLQ = new ArrayDeque<>(); + // Only the most recent 1k failures are persisted. + // If more are needed, can turn on debug LOG for this class. + private final Queue> mRecentFailures = + Queues.synchronizedQueue(EvictingQueue.create(1_000)); + private final Queue> mRecentRetries = + Queues.synchronizedQueue(EvictingQueue.create(1_000)); + private final Set mFailedFiles = new ConcurrentHashSet<>(); private final AtomicLong mSkippedBlocksCount = new AtomicLong(); + private final AtomicLong mScannedInodesCount = new AtomicLong(); private final AtomicLong mProcessedInodesCount = new AtomicLong(); private final AtomicLong mLoadedByteCount = new AtomicLong(); private final AtomicLong mTotalByteCount = new AtomicLong(); private final AtomicLong mSkippedByteCount = new AtomicLong(); private final AtomicLong mProcessingSubTasksCount = new AtomicLong(); - //including retry, do accurate stats later. - private final AtomicLong mTotalFailureCount = new AtomicLong(); - private final AtomicLong mCurrentFailureCount = new AtomicLong(); + private final AtomicLong mRetrySubTasksCount = new AtomicLong(); + private final AtomicLong mTotalFinalFailureCount = new AtomicLong(); private Optional mFailedReason = Optional.empty(); - private Iterator mUfsStatusIterator; - private AtomicBoolean mPreparingTasks = new AtomicBoolean(false); + private final AtomicBoolean mPreparingTasks = new AtomicBoolean(false); private final UnderFileSystem mUfs; - private boolean mLoadMetadataOnly; + private final boolean mLoadMetadataOnly; private static final double FAILURE_RATIO_THRESHOLD = Configuration.getDouble( PropertyKey.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_RATIO_THRESHOLD); private static final int FAILURE_COUNT_THRESHOLD = Configuration.getInt( PropertyKey.MASTER_DORA_LOAD_JOB_TOTAL_FAILURE_COUNT_THRESHOLD); - private static final int RETRY_ATTEMPT_THRESHOLD = Configuration.getInt( - PropertyKey.MASTER_DORA_LOAD_JOB_RETRIES); + private static final boolean GET_LATEST_UFS_ON_RETRY = Configuration.getBoolean( + PropertyKey.MASTER_DORA_LOAD_JOB_GET_LATEST_UFS_STATUS_ON_RETRY + ); private final boolean mSkipIfExists; private final Optional mFileFilterRegx; private final long mVirtualBlockSize = Configuration.getBytes( PropertyKey.DORA_READ_VIRTUAL_BLOCK_SIZE); - private Iterator mCurrentSubTaskIterator; + private final LoadSubTaskIterator mLoadSubTaskIterator; private final int mNumReplica; + private final long mJobStartTimestamp; + private long mJobFinishTimestamp = -1; + @Nullable private volatile String mFailedFileSavedPath = null; + + class LoadSubTaskIterator implements Iterator { + private LoadSubTaskIterator(Iterator ufsStatusIterator) { + mUfsStatusIterator = ufsStatusIterator; + } + + volatile Set mWorkers = null; + Iterator mCurrentUfsStatusSubTaskIterator = Collections.emptyIterator(); + Iterator mUfsStatusIterator; + + private List generateSubTasksForFile( + UfsStatus ufsStatus, Set workers) { + List subTasks = new ArrayList<>(); + // add load metadata task + LoadMetadataSubTask subTask = new LoadMetadataSubTask(ufsStatus, mVirtualBlockSize); + subTasks.add(subTask); + if (!mLoadMetadataOnly && ufsStatus.isFile() + && ufsStatus.asUfsFileStatus().getContentLength() != 0) { + long contentLength = ufsStatus.asUfsFileStatus().getContentLength(); + if (mVirtualBlockSize > 0) { + int numBlocks = (int) (contentLength / mVirtualBlockSize) + 1; + for (int i = 0; i < numBlocks; i++) { + long offset = mVirtualBlockSize * i; + long leftover = contentLength - offset; + subTasks.add(new LoadDataSubTask(ufsStatus, mVirtualBlockSize, offset, + Math.min(leftover, mVirtualBlockSize))); + } + } + else { + subTasks.add(new LoadDataSubTask(ufsStatus, mVirtualBlockSize, 0, contentLength)); + } + } + List subTasksWithWorker = + assignSubtasksToWorkers(subTasks, workers, mNumReplica); + mTotalByteCount.addAndGet( + subTasksWithWorker.stream().mapToLong(LoadSubTask::getLength).sum()); + mProcessingSubTasksCount.addAndGet(subTasksWithWorker.size()); + return subTasksWithWorker; + } + + private List assignSubtasksToWorkers( + List subTasks, Set workers, int numReplica) { + ImmutableList.Builder replicaSubTasks = new ImmutableList.Builder<>(); + for (LoadSubTask subTask : subTasks) { + List pickedWorkers = + mWorkerAssignPolicy.pickWorkers(subTask.asString(), workers, numReplica); + for (int i = 0; i < numReplica; i++) { + replicaSubTasks.add(subTask.copy().setWorkerInfo(pickedWorkers.get(i))); + } + } + return replicaSubTasks.build(); + } + + public void updateWorkerList(Set workers) { + mWorkers = workers; + } + + @Override + public boolean hasNext() { + return mCurrentUfsStatusSubTaskIterator.hasNext() + || mUfsStatusIterator.hasNext(); + } + + @Override + public LoadSubTask next() { + if (mCurrentUfsStatusSubTaskIterator.hasNext()) { + return mCurrentUfsStatusSubTaskIterator.next(); + } + UfsStatus ufsStatus = mUfsStatusIterator.next(); + mScannedInodesCount.incrementAndGet(); + List subTasks = generateSubTasksForFile(ufsStatus, mWorkers); + mCurrentUfsStatusSubTaskIterator = subTasks.listIterator(); + // A ufs status generates at least one subtask. + return mCurrentUfsStatusSubTaskIterator.next(); + } + } /** * Constructor. @@ -160,8 +253,9 @@ public DoraLoadJob(String path, Optional user, String jobId, OptionalLon mLoadMetadataOnly = loadMetadataOnly; mSkipIfExists = skipIfExists; mFileFilterRegx = fileFilterRegx; - mUfsStatusIterator = ufsStatusIterator; + mLoadSubTaskIterator = new LoadSubTaskIterator(ufsStatusIterator); mNumReplica = replica; + mJobStartTimestamp = CommonUtils.getCurrentMs(); LOG.info("DoraLoadJob for {} created.", path); } @@ -173,65 +267,49 @@ public DoraLoadJob(String path, Optional user, String jobId, OptionalLon */ private List prepareNextTasks(Set workers) { LOG.debug("Preparing next set of tasks for jobId:{}", mJobId); + mLoadSubTaskIterator.updateWorkerList(workers); int workerNum = workers.size(); ImmutableList.Builder batchBuilder = ImmutableList.builder(); - if (mCurrentSubTaskIterator == null) { - if (mUfsStatusIterator.hasNext()) { - mCurrentSubTaskIterator = initSubTaskIterator(workers); - } - else { - return Collections.emptyList(); - } - } - int i = 0; - int startRetryListSize = mRetrySubTasks.size(); - int numSubTasks = 0; - while (numSubTasks < RETRY_THRESHOLD - && i++ < startRetryListSize && mRetrySubTasks.peek() != null) { - LoadSubTask subTask = mRetrySubTasks.poll(); - String path = subTask.getUfsPath(); - try { - mUfs.getStatus(path); - batchBuilder.add(subTask); - ++numSubTasks; - } catch (IOException | AlluxioRuntimeException e) { - // The previous list or get might contain stale file metadata. - // For example, if a file gets removed before the worker actually loads it, - // the load will fail and the scheduler will retry. - // In such case, a FileNotFoundException might be thrown when we attempt to - // get the file status again, and we simply ignore that file. - if (!(e instanceof FileNotFoundException || e instanceof NotFoundRuntimeException)) { - mRetrySubTasks.offer(subTask); - } - } - } - while (numSubTasks < BATCH_SIZE * workerNum) { - if (!mCurrentSubTaskIterator.hasNext()) { - if (!mUfsStatusIterator.hasNext()) { - break; - } - else { - mCurrentSubTaskIterator = initSubTaskIterator(workers); + + for (int numSubTasks = 0; numSubTasks < BATCH_SIZE * workerNum; ++numSubTasks) { + if (mLoadSubTaskIterator.hasNext()) { + batchBuilder.add(mLoadSubTaskIterator.next()); + numSubTasks++; + } else if (!mRetrySubTasksDLQ.isEmpty()) { + LoadSubTask subTaskToRetry = mRetrySubTasksDLQ.poll(); + String path = subTaskToRetry.getUfsPath(); + try { + if (GET_LATEST_UFS_ON_RETRY) { + mUfs.getStatus(path); + } + batchBuilder.add(subTaskToRetry); + ++numSubTasks; + } catch (IOException | AlluxioRuntimeException e) { + // The previous list or get might contain stale file metadata. + // For example, if a file gets removed before the worker actually loads it, + // the load will fail and the scheduler will retry. + // In such case, a FileNotFoundException might be thrown when we attempt to + // get the file status again, and we simply ignore that file. + if (!(e instanceof FileNotFoundException || e instanceof NotFoundRuntimeException)) { + addFileFailure(subTaskToRetry, FailureReason.MEMBERSHIP_CHANGED, + "Failed to get UfsStatus on retry"); + } } + } else { + break; } - batchBuilder.add(mCurrentSubTaskIterator.next()); - numSubTasks++; } + ImmutableList subTasks = batchBuilder.build(); Map workerToTaskMap = aggregateSubTasks(subTasks); if (workerToTaskMap.isEmpty()) { return Collections.unmodifiableList(new ArrayList<>()); } - List tasks = workerToTaskMap.values().stream().collect(Collectors.toList()); + List tasks = new ArrayList<>(workerToTaskMap.values()); LOG.debug("prepared tasks:{}", tasks); return tasks; } - private Iterator initSubTaskIterator(Set workers) { - - return createSubTasks(mUfsStatusIterator.next(), workers).listIterator(); - } - private Map aggregateSubTasks(List subTasks) { Map workerToTaskMap = new HashMap<>(); for (LoadSubTask subtask : subTasks) { @@ -249,46 +327,6 @@ private Map aggregateSubTasks(List subTas return workerToTaskMap; } - private List createSubTasks(UfsStatus ufsStatus, Set workers) { - List subTasks = new ArrayList<>(); - // add load metadata task - LoadMetadataSubTask subTask = new LoadMetadataSubTask(ufsStatus, mVirtualBlockSize); - subTasks.add(subTask); - if (!mLoadMetadataOnly && ufsStatus.isFile() - && ufsStatus.asUfsFileStatus().getContentLength() != 0) { - long contentLength = ufsStatus.asUfsFileStatus().getContentLength(); - if (mVirtualBlockSize > 0) { - int numBlocks = (int) (contentLength / mVirtualBlockSize) + 1; - for (int i = 0; i < numBlocks; i++) { - long offset = mVirtualBlockSize * i; - long leftover = contentLength - offset; - subTasks.add(new LoadDataSubTask(ufsStatus, mVirtualBlockSize, offset, - Math.min(leftover, mVirtualBlockSize))); - } - } - else { - subTasks.add(new LoadDataSubTask(ufsStatus, mVirtualBlockSize, 0, contentLength)); - } - } - List subTasksWithWorker = assignSubtasksToWorkers(subTasks, workers, mNumReplica); - mTotalByteCount.addAndGet(subTasksWithWorker.stream().mapToLong(LoadSubTask::getLength).sum()); - mProcessingSubTasksCount.addAndGet(subTasksWithWorker.size()); - return subTasksWithWorker; - } - - private List assignSubtasksToWorkers(List subTasks, - Set workers, int numReplica) { - ImmutableList.Builder replicaSubTasks = new ImmutableList.Builder<>(); - for (LoadSubTask subTask : subTasks) { - List pickedWorkers = - mWorkerAssignPolicy.pickWorkers(subTask.asString(), workers, numReplica); - for (int i = 0; i < numReplica; i++) { - replicaSubTasks.add(subTask.copy().setWorkerInfo(pickedWorkers.get(i))); - } - } - return replicaSubTasks.build(); - } - /** * Get load file path. * @return file path @@ -316,14 +354,50 @@ public OptionalLong getBandwidth() { */ @Override public void failJob(AlluxioRuntimeException reason) { + mJobFinishTimestamp = CommonUtils.getCurrentMs(); setJobState(JobState.FAILED, true); mFailedReason = Optional.of(reason); + // Move all pending retry subtask to failed subtask set + while (!mRetrySubTasksDLQ.isEmpty()) { + addFileFailure( + mRetrySubTasksDLQ.poll(), + FailureReason.CANCELLED, + "Retry cancelled due to job failure"); + } JOB_LOAD_FAIL.inc(); LOG.info("Load Job {} fails with status: {}", mJobId, this); + persistFailedFilesList(); + } + + private void persistFailedFilesList() { + LOG.info("Starting persisting failed files..."); + String fileListDir = + Configuration.getString(PropertyKey.MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR); + String startTime = new SimpleDateFormat("yyyy_MM_dd_HH:mm:ss").format(mStartTime); + String fileName = + (mLoadPath + "_" + startTime).replaceAll("[^a-zA-Z0-9-_\\.]", "_"); + try { + Files.createDirectories(Paths.get(fileListDir)); + } catch (Exception e) { + LOG.warn("Failed to create directory to store failed file list {}", fileListDir, e); + return; + } + File output = new File(fileListDir, fileName); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(output))) { + for (String path : mFailedFiles) { + writer.write(path); + writer.newLine(); + } + LOG.info("Persisted the failed file list to {} successfully", output.getAbsolutePath()); + mFailedFileSavedPath = output.getAbsolutePath(); + } catch (Exception e) { + LOG.warn("Failed to persist the failed file list to {}", fileName, e); + } } @Override public void setJobSuccess() { + mJobFinishTimestamp = CommonUtils.getCurrentMs(); setJobState(JobState.SUCCEEDED, true); JOB_LOAD_SUCCESS.inc(); LOG.info("Load Job {} succeeds with status {}", mJobId, this); @@ -340,42 +414,44 @@ public void addLoadedBytes(long bytes) { /** * Add files to retry. - * @param type the error type + * @param reason the failure reason * @param message the error message * @param subTask subTask to be retried - * @return true */ @VisibleForTesting - public boolean addSubTaskToRetry(LoadSubTask subTask, String type, String message) { + private void addSubTaskToRetryOrFail(LoadSubTask subTask, FailureReason reason, String message) { LOG.debug("Retry file {}", subTask); - int currentErrorCount = mRetryCount.getOrDefault(subTask, 0); - if (currentErrorCount >= RETRY_ATTEMPT_THRESHOLD) { - addFileFailure(subTask.getUfsPath(), type, message); - mRetryCount.remove(subTask); - return true; + if (subTask.isRetry() || !isHealthy()) { + addFileFailure(subTask, reason, message); + return; } - mRetryCount.put(subTask, mRetryCount.getOrDefault(subTask, 0) + 1); - mRetrySubTasks.offer(subTask); - mTotalFailureCount.incrementAndGet(); + subTask.setRetry(true); + mRetrySubTasksDLQ.offer(subTask); + mRetrySubTasksCount.incrementAndGet(); + mRecentRetries.add(new Pair<>( + subTask, format("Reason: %s, message: %s", reason.name(), message))); + MultiDimensionalMetricsSystem.DISTRIBUTED_LOAD_FAILURE.labelValues( + reason.name(), Boolean.toString(false)).inc(); LOAD_FAIL_COUNT.inc(); - return true; } /** * Add failed files. - * @param fileUfsPath - * @param message - * @param type + * @param subTask the load subtask + * @param reason the failure reason + * @param message the error message */ - @VisibleForTesting - public void addFileFailure(String fileUfsPath, String type, String message) { + private void addFileFailure(LoadSubTask subTask, FailureReason reason, String message) { // When multiple blocks of the same file failed to load, from user's perspective, // it's not hugely important what are the reasons for each specific failure, // if they are different, so we will just keep the first one. - mFailedFiles.put(fileUfsPath, - format("Status code: %s, message: %s", type, message)); - LOAD_FAIL_COUNT.inc(); + mFailedFiles.add(subTask.getUfsPath()); + mRecentFailures.add(new Pair<>( + subTask, format("Reason: %s, message: %s", reason.name(), message))); + mTotalFinalFailureCount.incrementAndGet(); + MultiDimensionalMetricsSystem.DISTRIBUTED_LOAD_FAILURE.labelValues( + reason.name(), Boolean.toString(true)).inc(); } @Override @@ -402,7 +478,8 @@ public int hashCode() { @Override public boolean isHealthy() { - long totalFailureCount = mTotalFailureCount.get(); + // Tasks to retry are considered as "failed" tasks when calculating if the job is healthy. + long totalFailureCount = mTotalFinalFailureCount.get() + mRetrySubTasksDLQ.size(); if (FAILURE_RATIO_THRESHOLD >= 1.0 || FAILURE_COUNT_THRESHOLD < 0) { return true; } @@ -413,8 +490,7 @@ public boolean isHealthy() { @Override public boolean isCurrentPassDone() { - return !mUfsStatusIterator.hasNext() - && !mCurrentSubTaskIterator.hasNext() && mRetrySubTasks.isEmpty() + return !mLoadSubTaskIterator.hasNext() && mRetrySubTasksDLQ.isEmpty() && mRetryTaskList.isEmpty(); } @@ -459,18 +535,18 @@ public String toString() { .add("Bandwidth", mBandwidth) .add("UsePartialListing", mUsePartialListing) .add("VerificationEnabled", mVerificationEnabled) - .add("RetrySubTasks", mRetrySubTasks) + .add("RetrySubTasks", mRetrySubTasksDLQ) .add("FailedFiles", mFailedFiles) .add("StartTime", mStartTime) .add("SkippedFileCount", mSkippedBlocksCount) .add("ProcessedInodesCount", mProcessedInodesCount) + .add("RetryTaskCount", mRetrySubTasksCount) .add("LoadedByteCount", mLoadedByteCount) - .add("TotalFailureCount", mTotalFailureCount) + .add("TotalFailureCount", mTotalFinalFailureCount) .add("SkippedByteCount", mSkippedByteCount) .add("State", mState) .add("BatchSize", BATCH_SIZE) .add("FailedReason", mFailedReason) - .add("UfsStatusIterator", mUfsStatusIterator) .add("EndTime", mEndTime) .toString(); } @@ -509,7 +585,7 @@ public long getDurationInSec() { public boolean processResponse(DoraLoadTask doraLoadTask) { try { long totalLoadedBytes = doraLoadTask.getSubTasks().stream() - .map((it) -> (it.getLength())) + .map(LoadSubTask::getLength) .reduce(Long::sum) .orElse(0L); // what if timeout ? job needs to proactively check or task needs to be aware @@ -521,19 +597,18 @@ public boolean processResponse(DoraLoadTask doraLoadTask) { if (failure.getSubtask().hasLoadDataSubtask()) { totalLoadedBytes -= failure.getSubtask().getLoadDataSubtask().getLength(); } - String status = Status.fromCodeValue(failure.getCode()).toString(); LoadSubTask subTask = LoadSubTask.from(failure, mVirtualBlockSize); - if (!isHealthy() || !failure.getRetryable() || !addSubTaskToRetry(subTask, status, - failure.getMessage())) { - addFileFailure( - subTask.getUfsPath(), status, failure.getMessage()); + if (!failure.getRetryable()) { + addSubTaskToRetryOrFail(subTask, FailureReason.WORKER_FAILED, failure.getMessage()); + } else { + addFileFailure(subTask, FailureReason.WORKER_FAILED, failure.getMessage()); } } } - int totalLoadedInodes = doraLoadTask.getSubTasks().stream() - .filter(LoadSubTask::isLoadMetadata).collect(Collectors.toList()).size() - - response.getFailuresList().stream().filter(i -> i.getSubtask().hasLoadMetadataSubtask()) - .collect(Collectors.toList()).size(); + int totalLoadedInodes = (int) doraLoadTask.getSubTasks().stream() + .filter(LoadSubTask::isLoadMetadata).count() + - (int) response.getFailuresList().stream() + .filter(i -> i.getSubtask().hasLoadMetadataSubtask()).count(); if (!mLoadMetadataOnly) { addLoadedBytes(totalLoadedBytes - response.getBytesSkipped()); LOAD_FILE_SIZE.inc(totalLoadedBytes); @@ -551,28 +626,35 @@ public boolean processResponse(DoraLoadTask doraLoadTask) { LOG.warn("exception when trying to get load response.", cause); for (LoadSubTask subTask : doraLoadTask.getSubTasks()) { AlluxioRuntimeException exception = AlluxioRuntimeException.from(cause); - if (isHealthy()) { - addSubTaskToRetry(subTask, exception.getStatus().toString(), exception.getMessage()); - } else { - addFileFailure(subTask.getUfsPath(), - exception.getStatus().toString(), exception.getMessage()); - } + addSubTaskToRetryOrFail(subTask, FailureReason.WORKER_RPC_FAILED, + exception.getStatus() + ":" + exception.getMessage()); } return false; } catch (CancellationException e) { LOG.warn("[DistributedLoad] Task get canceled and will retry.", e); - doraLoadTask.getSubTasks().forEach(it -> addSubTaskToRetry(it, "CANCELLED", e.getMessage())); + doraLoadTask.getSubTasks().forEach(it -> addSubTaskToRetryOrFail( + it, FailureReason.CANCELLED, e.getMessage())); return true; } catch (InterruptedException e) { - doraLoadTask.getSubTasks().forEach(it -> addSubTaskToRetry(it, "ABORTED", e.getMessage())); + doraLoadTask.getSubTasks().forEach(it -> addSubTaskToRetryOrFail( + it, FailureReason.INTERRUPTED, e.getMessage())); Thread.currentThread().interrupt(); // We don't count InterruptedException as task failure return true; } } + @Override + public void onWorkerUnavailable(DoraLoadTask task) { + LOG.warn("Worker became unavailable: {}", task.getMyRunningWorker()); + for (LoadSubTask subTask: task.getSubTasks()) { + addSubTaskToRetryOrFail( + subTask, FailureReason.MEMBERSHIP_CHANGED, "Worker became unavailable"); + } + } + @Override public boolean hasFailure() { return !mFailedFiles.isEmpty(); @@ -657,23 +739,31 @@ private static class LoadProgressReport { private final boolean mVerificationEnabled; private final long mSkippedByteCount; private final long mLoadedByteCount; + private final long mScannedInodesCount; private final long mProcessedInodesCount; private final Long mTotalByteCount; private final Long mThroughput; - private final double mFailurePercentage; + private final double mFailureFilesPercentage; + private final double mFailureSubTasksPercentage; + private final double mRetrySubTasksPercentage; + private final AlluxioRuntimeException mFailureReason; private final long mFailedFileCount; - private final Map mFailedFilesWithReasons; + private List> mRecentFailedSubtasksWithReasons = null; + private List> mRecentRetryingSubtasksWithReasons = null; private final boolean mSkipIfExists; private final boolean mMetadataOnly; + private String mRunningStage; + private final int mRetryDeadLetterQueueSize; + private final long mTimeElapsed; + @Nullable private final String mFailedFileSavedPath; /** * Constructor. * @param job the job * @param verbose verbose */ - public LoadProgressReport(DoraLoadJob job, boolean verbose) - { + public LoadProgressReport(DoraLoadJob job, boolean verbose) { mVerbose = verbose; mJobState = job.mState; mBandwidth = job.mBandwidth.isPresent() ? job.mBandwidth.getAsLong() : null; @@ -682,30 +772,47 @@ public LoadProgressReport(DoraLoadJob job, boolean verbose) mLoadedByteCount = job.mLoadedByteCount.get(); if (!job.mUsePartialListing) { mTotalByteCount = job.mTotalByteCount.get(); - } - else { + } else { mTotalByteCount = null; } long duration = job.getDurationInSec(); if (duration > 0) { mThroughput = job.mLoadedByteCount.get() / duration; - } - else { + } else { mThroughput = null; } - mFailurePercentage = - ((double) (job.mTotalFailureCount.get()) - / (mProcessedInodesCount)) * 100; + mFailureFilesPercentage = + ((double) (job.mFailedFiles.size()) + / (job.mScannedInodesCount.get())) * 100; + mFailureSubTasksPercentage = + ((double) (job.mTotalFinalFailureCount.get()) + / (job.mProcessingSubTasksCount.get())) * 100; + mRetrySubTasksPercentage = + ((double) (job.mRetrySubTasksCount.get()) + / (job.mProcessingSubTasksCount.get())) * 100; + mScannedInodesCount = job.mScannedInodesCount.get(); mFailureReason = job.mFailedReason.orElse(null); mFailedFileCount = job.mFailedFiles.size(); - if (verbose && mFailedFileCount > 0) { - mFailedFilesWithReasons = job.mFailedFiles; - } else { - mFailedFilesWithReasons = null; + if (verbose) { + if (!job.mRecentFailures.isEmpty()) { + mRecentFailedSubtasksWithReasons = new ArrayList<>(job.mRecentFailures); + } + if (!job.mRecentRetries.isEmpty()) { + mRecentRetryingSubtasksWithReasons = new ArrayList<>(job.mRecentRetries); + } } mSkippedByteCount = job.mSkippedByteCount.get(); mSkipIfExists = job.mSkipIfExists; mMetadataOnly = job.mLoadMetadataOnly; + mRunningStage = ""; + if (mJobState == JobState.RUNNING) { + mRunningStage = job.mLoadSubTaskIterator.hasNext() ? "LOADING" : "RETRYING"; + } + mRetryDeadLetterQueueSize = job.mRetrySubTasksDLQ.size(); + mTimeElapsed = + (job.mJobFinishTimestamp == -1 ? CommonUtils.getCurrentMs() : job.mJobFinishTimestamp) + - job.mJobStartTimestamp; + mFailedFileSavedPath = job.mFailedFileSavedPath; } public String getReport(JobProgressReportFormat format) @@ -727,18 +834,31 @@ private String getTextReport() { format("\tSettings:\tbandwidth: %s\tverify: %s\tmetadata-only: %s%n", mBandwidth == null ? "unlimited" : mBandwidth, mVerificationEnabled, mMetadataOnly)); + progress.append(format("\tTime Elapsed: %s%n", + DurationFormatUtils.formatDuration(mTimeElapsed, "HH:mm:ss"))); progress.append(format("\tJob State: %s%s%n", mJobState, mFailureReason == null ? "" : format( " (%s: %s)", mFailureReason.getClass().getName(), mFailureReason.getMessage()))); + if (mJobState == JobState.RUNNING) { + progress.append(format("\tStage: %s%n", mRunningStage)); + } if (mVerbose && mFailureReason != null) { for (StackTraceElement stack : mFailureReason.getStackTrace()) { progress.append(format("\t\t%s%n", stack.toString())); } } + progress.append(format("\tInodes Scanned: %d%n", mScannedInodesCount)); progress.append(format("\tInodes Processed: %d%n", mProcessedInodesCount)); + if (mSkipIfExists) { + progress.append(format("\tBytes Skipped: %s%s%n", + FormatUtils.getSizeFromBytes(mSkippedByteCount), + mTotalByteCount == null + ? "" : format(" out of %s", FormatUtils.getSizeFromBytes(mTotalByteCount)))); + } + if (!mMetadataOnly) { progress.append(format("\tBytes Loaded: %s%s%n", FormatUtils.getSizeFromBytes(mLoadedByteCount), @@ -748,18 +868,24 @@ private String getTextReport() { progress.append(format("\tThroughput: %s/s%n", FormatUtils.getSizeFromBytes(mThroughput))); } - progress.append(format("\tFailure rate: %.2f%%%n", mFailurePercentage)); - } - if (mSkipIfExists) { - progress.append(format("\tBytes Skipped: %s%s%n", - FormatUtils.getSizeFromBytes(mSkippedByteCount), - mTotalByteCount == null - ? "" : format(" out of %s", FormatUtils.getSizeFromBytes(mTotalByteCount)))); } + progress.append(format("\tFile Failure rate: %.2f%%%n", mFailureFilesPercentage)); + progress.append(format("\tSubtask Failure rate: %.2f%%%n", mFailureSubTasksPercentage)); progress.append(format("\tFiles Failed: %s%n", mFailedFileCount)); - if (mVerbose && mFailedFilesWithReasons != null) { - mFailedFilesWithReasons.forEach((fileName, reason) -> - progress.append(format("\t\t%s: %s%n", fileName, reason))); + if (mVerbose && mRecentFailedSubtasksWithReasons != null) { + progress.append(format("\tRecent failed subtasks: %n")); + mRecentFailedSubtasksWithReasons.forEach(pair -> + progress.append(format("\t\t%s: %s%n", pair.getFirst(), pair.getSecond()))); + progress.append(format("\tRecent retrying subtasks: %n")); + mRecentRetryingSubtasksWithReasons.forEach(pair -> + progress.append(format("\t\t%s: %s%n", pair.getFirst(), pair.getSecond()))); + } + + progress.append(format("\tSubtask Retry rate: %.2f%%%n", mRetrySubTasksPercentage)); + progress.append( + format("\tSubtasks on Retry Dead Letter Queue: %s%n", mRetryDeadLetterQueueSize)); + if (mFailedFileSavedPath != null) { + progress.append(format("\tFailed files saved to: %s%n", mFailedFileSavedPath)); } return progress.toString(); } @@ -777,6 +903,15 @@ private String getJsonReport() { } // metrics + enum FailureReason { + CANCELLED, + INTERRUPTED, + MEMBERSHIP_CHANGED, + WORKER_FAILED, + WORKER_RPC_FAILED, + WORKER_NOT_REACHABLE; + } + public static final Counter JOB_LOAD_SUCCESS = MetricsSystem.counter(MetricKey.MASTER_JOB_LOAD_SUCCESS.getName()); public static final Counter JOB_LOAD_FAIL = diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/LoadSubTask.java b/dora/core/server/master/src/main/java/alluxio/master/job/LoadSubTask.java index aacdcecb42b4..834ab0e1ca4d 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/LoadSubTask.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/LoadSubTask.java @@ -16,6 +16,8 @@ import alluxio.underfs.UfsStatus; import alluxio.wire.WorkerInfo; +import com.google.common.base.MoreObjects; + /** * Load sub task. It's either load metadata or load data. */ @@ -24,6 +26,8 @@ public abstract class LoadSubTask implements ShardKey { protected ShardKey mHashKey; private WorkerInfo mWorkerInfo; + private boolean mIsRetry; + LoadSubTask(UfsStatus ufsStatus) { mUfsStatus = ufsStatus; } @@ -68,6 +72,20 @@ public LoadSubTask setWorkerInfo(WorkerInfo worker) { return this; } + /** + * @return if the subtask is a retry task (not being executed first time) + */ + public boolean isRetry() { + return mIsRetry; + } + + /** + * @param retry true if this is a retry task + */ + public void setRetry(boolean retry) { + mIsRetry = retry; + } + /** * @param loadFailure the subtask failure from worker * @param virtualBlockSize the virtual block size @@ -85,4 +103,13 @@ public static LoadSubTask from(LoadFailure loadFailure, long virtualBlockSize) { failure.getLoadDataSubtask().getOffsetInFile(), failure.getLoadDataSubtask().getLength()); } } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("UfsPath", getUfsPath()) + .add("ShardingKey", mHashKey.asString()) + .add("Worker", getWorkerInfo().getAddress().toString()) + .toString(); + } } diff --git a/dora/core/server/master/src/main/java/alluxio/master/scheduler/MembershipManagerWorkerProvider.java b/dora/core/server/master/src/main/java/alluxio/master/scheduler/MembershipManagerWorkerProvider.java index d023aa1a897b..243ba7895fba 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/scheduler/MembershipManagerWorkerProvider.java +++ b/dora/core/server/master/src/main/java/alluxio/master/scheduler/MembershipManagerWorkerProvider.java @@ -13,6 +13,8 @@ import alluxio.client.block.stream.BlockWorkerClient; import alluxio.client.file.FileSystemContext; +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; import alluxio.exception.runtime.AlluxioRuntimeException; import alluxio.membership.MembershipManager; import alluxio.resource.CloseableResource; @@ -28,6 +30,7 @@ public class MembershipManagerWorkerProvider implements WorkerProvider { private final MembershipManager mMembershipManager; private final FileSystemContext mContext; + private final boolean mEnableDynamicHashRing; /** * CTOR for MembershipManagerWorkerProvider. @@ -38,11 +41,22 @@ public MembershipManagerWorkerProvider(MembershipManager membershipMgr, FileSystemContext context) { mMembershipManager = membershipMgr; mContext = context; + if (context != null && context.getClusterConf() != null) { + mEnableDynamicHashRing = + context.getClusterConf() + .getBoolean(PropertyKey.USER_DYNAMIC_CONSISTENT_HASH_RING_ENABLED); + } else { + mEnableDynamicHashRing = Configuration.global() + .getBoolean(PropertyKey.USER_DYNAMIC_CONSISTENT_HASH_RING_ENABLED); + } } @Override public List getWorkerInfos() { try { + if (mEnableDynamicHashRing) { + return getLiveWorkerInfos(); + } return mMembershipManager.getAllMembers(); } catch (IOException ex) { throw AlluxioRuntimeException.from(ex); diff --git a/dora/core/server/master/src/main/java/alluxio/master/scheduler/Scheduler.java b/dora/core/server/master/src/main/java/alluxio/master/scheduler/Scheduler.java index 4700ce71307b..7771924fec3f 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/scheduler/Scheduler.java +++ b/dora/core/server/master/src/main/java/alluxio/master/scheduler/Scheduler.java @@ -527,17 +527,18 @@ public void kickStartTasks() { tasksQ.size()); CloseableResource blkWorkerClientResource = mActiveWorkers.get(workerInfo); - if (blkWorkerClientResource == null) { - LOG.warn("Didn't find corresponding BlockWorkerClient for workerInfo:{}", - workerInfo); - return; - } Task task = tasksQ.peek(); // only make sure 1 task is running at the time if (task == null || task.getResponseFuture() != null) { LOG.debug("head task is {}", (task == null) ? "NULL" : "already running"); return; } + if (blkWorkerClientResource == null) { + LOG.warn("Didn't find corresponding BlockWorkerClient for workerInfo:{}", + workerInfo); + task.getJob().onWorkerUnavailable(task); + return; + } task.execute(blkWorkerClientResource.get(), workerInfo.mWorkerInfo); task.getResponseFuture().addListener(() -> { Job job = task.getJob(); diff --git a/dora/job/common/src/main/java/alluxio/scheduler/job/Job.java b/dora/job/common/src/main/java/alluxio/scheduler/job/Job.java index e9e1d943a195..4e8a9da4141f 100644 --- a/dora/job/common/src/main/java/alluxio/scheduler/job/Job.java +++ b/dora/job/common/src/main/java/alluxio/scheduler/job/Job.java @@ -126,6 +126,12 @@ public interface Job> { */ void onTaskSubmitFailure(Task task); + /** + * Triggers when a worker is not available caused by membership changes. + * @param task the task to execute on the worker + */ + void onWorkerUnavailable(T task); + /** * @return job journal entry */