Skip to content

Commit

Permalink
Fix job-master leak memory when submitting distributed jobs
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #18639
orig-commit: a4ec456
orig-commit-author: Echo🌟 <[email protected]>

			pr-link: #18651
			change-id: cid-d4e5853a1818a22c8a0411a27bfe1141c6f24ebd
  • Loading branch information
alluxio-bot authored Jul 8, 2024
1 parent 42b5566 commit ffa4bc7
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 13 deletions.
8 changes: 8 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -7357,6 +7357,12 @@ public String toString() {
.setDefaultValue("60sec")
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_JOB_TRACE_RETENTION_TIME =
durationBuilder(Name.JOB_MASTER_JOB_TRACE_RETENTION_TIME)
.setDescription("The length of time the client can trace the submitted job.")
.setDefaultValue("1d")
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_JOB_CAPACITY =
longBuilder(Name.JOB_MASTER_JOB_CAPACITY)
.setDescription("The total possible number of available job statuses in the job master. "
Expand Down Expand Up @@ -9191,6 +9197,8 @@ public static final class Name {
"alluxio.job.master.finished.job.purge.count";
public static final String JOB_MASTER_FINISHED_JOB_RETENTION_TIME =
"alluxio.job.master.finished.job.retention.time";
public static final String JOB_MASTER_JOB_TRACE_RETENTION_TIME =
"alluxio.job.master.job.trace.retention.time";
public static final String JOB_MASTER_JOB_CAPACITY = "alluxio.job.master.job.capacity";
public static final String JOB_MASTER_MASTER_HEARTBEAT_INTERVAL =
"alluxio.job.master.master.heartbeat.interval";
Expand Down
7 changes: 7 additions & 0 deletions job/common/src/main/java/alluxio/job/wire/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public boolean isFinished() {
return this.equals(CANCELED) || this.equals(FAILED) || this.equals(COMPLETED);
}

/**
* @return whether this status represents a Completed state
*/
public boolean isCompleted() {
return this.equals(COMPLETED);
}

/**
* @return proto representation of the status
*/
Expand Down
2 changes: 1 addition & 1 deletion job/server/src/main/java/alluxio/master/job/JobMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public JobMaster(MasterContext masterContext, FileSystem filesystem,
mWorkerHealth = new ConcurrentHashMap<>();

mCmdJobTracker = new CmdJobTracker(
fsContext, this);
fsContext, this, mPlanTracker);

MetricsSystem.registerGaugeIfAbsent(
MetricKey.MASTER_JOB_COUNT.getName(),
Expand Down
20 changes: 17 additions & 3 deletions job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class PlanTracker {
private final SortedSet<PlanInfo> mFailed;

/** A FIFO queue used to track jobs which have status {@link Status#isFinished()} as true. */
private final LinkedBlockingQueue<PlanInfo> mFinished;
private final LinkedList<PlanInfo> mFinished;

private final WorkflowTracker mWorkflowTracker;

Expand Down Expand Up @@ -114,7 +114,7 @@ public PlanTracker(long capacity, long retentionMs,
}
return Long.signum(right.getId() - left.getId());
}));
mFinished = new LinkedBlockingQueue<>();
mFinished = new LinkedList<>();
mWorkflowTracker = workflowTracker;
}

Expand Down Expand Up @@ -300,6 +300,20 @@ public Set<Long> findJobs(String name, List<Status> statusList) {
.map(Map.Entry::getKey).collect(Collectors.toSet());
}

/**
* Remove expired jobs in PlanTracker.
* @param jobIds the list of removed jobId
*/
public void removeJobs(List<Long> jobIds) {
mWorkflowTracker.cleanup(jobIds);
for (Long jobId : jobIds) {
PlanInfo removedPlanInfo = mCoordinators.get(jobId).getPlanInfo();
mCoordinators.remove(jobId);
mFailed.remove(removedPlanInfo);
mFinished.remove(removedPlanInfo);
}
}

private void checkActiveSetReplicaJobs(JobConfig jobConfig) throws JobDoesNotExistException {
if (jobConfig instanceof SetReplicaConfig) {
Set<Pair<String, Long>> activeJobs = mCoordinators.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@

import alluxio.AlluxioURI;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.JobDoesNotExistException;
import alluxio.grpc.OperationType;
import alluxio.job.CmdConfig;
import alluxio.job.cmd.load.LoadCliConfig;
import alluxio.job.cmd.migrate.MigrateCliConfig;
Expand All @@ -24,26 +27,31 @@
import alluxio.job.wire.Status;
import alluxio.master.job.JobMaster;
import alluxio.master.job.common.CmdInfo;
import alluxio.master.job.plan.PlanTracker;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;

/**
* CmdJobTracker to schedule a Cmd job to run.
*/
@ThreadSafe
public class CmdJobTracker {
public class CmdJobTracker implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CmdJobTracker.class);
private final Map<Long, CmdInfo> mInfoMap = new ConcurrentHashMap<>(0, 0.95f,
Math.max(8, 2 * Runtime.getRuntime().availableProcessors()));
Expand All @@ -52,18 +60,29 @@ public class CmdJobTracker {
private final PersistRunner mPersistRunner;
protected FileSystemContext mFsContext;
public static final String DELIMITER = ",";
private final ScheduledExecutorService mScheduleCleanExecutor;
private final Long mTraceRetentionTime;

private final PlanTracker mPlanTracker;

/**
* Create a new instance of {@link CmdJobTracker}.
* @param fsContext filesystem context
* @param jobMaster the job master
* @param planTracker the planTracker
*/
public CmdJobTracker(FileSystemContext fsContext,
JobMaster jobMaster) {
JobMaster jobMaster, PlanTracker planTracker) {
mFsContext = fsContext;
mDistLoadCliRunner = new DistLoadCliRunner(mFsContext, jobMaster);
mMigrateCliRunner = new MigrateCliRunner(mFsContext, jobMaster);
mPersistRunner = new PersistRunner(mFsContext, jobMaster);
mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor();
mScheduleCleanExecutor.scheduleAtFixedRate(this::
cleanExpiredJobInfos, 60, 600, TimeUnit.SECONDS);
mTraceRetentionTime = Configuration.getMs(
PropertyKey.JOB_MASTER_JOB_TRACE_RETENTION_TIME);
mPlanTracker = planTracker;
}

/**
Expand All @@ -72,15 +91,25 @@ public CmdJobTracker(FileSystemContext fsContext,
* @param distLoadCliRunner DistributedLoad runner
* @param migrateCliRunner DistributedCopy runner
* @param persistRunner Persist runner
* @param retentionTime job retention time
* @param planTracker the planTracker
*/
public CmdJobTracker(FileSystemContext fsContext,
DistLoadCliRunner distLoadCliRunner,
MigrateCliRunner migrateCliRunner,
PersistRunner persistRunner) {
PersistRunner persistRunner,
Long retentionTime,
PlanTracker planTracker
) {
mFsContext = fsContext;
mDistLoadCliRunner = distLoadCliRunner;
mMigrateCliRunner = migrateCliRunner;
mPersistRunner = persistRunner;
mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor();
mScheduleCleanExecutor.scheduleAtFixedRate(this::
cleanExpiredJobInfos, 60, 600, TimeUnit.SECONDS);
mTraceRetentionTime = retentionTime;
mPlanTracker = planTracker;
}

/**
Expand Down Expand Up @@ -134,7 +163,7 @@ private void runDistributedCommand(CmdConfig cmdConfig, long jobControlId)

/**
* Get status information for a CMD.
* @param jobControlId
* @param jobControlId jobControlId to trace a CMD
* @return the Command level status
*/
public Status getCmdStatus(long jobControlId) throws JobDoesNotExistException {
Expand Down Expand Up @@ -270,4 +299,37 @@ public CmdStatusBlock getCmdStatusBlock(long jobControlId)
.collect(Collectors.toList());
return new CmdStatusBlock(cmdInfo.getJobControlId(), blockList, cmdInfo.getOperationType());
}

private void cleanExpiredJobInfos() {
long currentTime = System.currentTimeMillis();
for (Map.Entry<Long, CmdInfo> x : mInfoMap.entrySet()) {
CmdInfo cmdInfo = x.getValue();
List<Long> cleanedJobsId = new ArrayList<>();
if (OperationType.DIST_LOAD.equals(cmdInfo.getOperationType())
&& currentTime - cmdInfo.getJobSubmissionTime() > mTraceRetentionTime) {
try {
Status jobStatus = getCmdStatus(cmdInfo.getJobControlId());
if (jobStatus.isFinished()) {
for (CmdRunAttempt runAttempt : cmdInfo.getCmdRunAttempt()) {
cleanedJobsId.add(runAttempt.getJobId());
}
mPlanTracker.removeJobs(cleanedJobsId);
mInfoMap.remove(cmdInfo.getJobControlId());
LOG.info("JobControlId:{} has been cleaned in CmdJobTracker,"
+ " client will not trace the job anymore.The filePaths in CmdInfo are:{}",
cmdInfo.getJobControlId(), String.join(", ", cmdInfo.getFilePath()));
}
} catch (JobDoesNotExistException e) {
LOG.warn("JobControlId:{} can not find in CmdJobTracker when clean expired Job"
+ "with unexpected exception.The filePaths in CmdInfo are:{}",
cmdInfo.getJobControlId(), String.join(", ", cmdInfo.getFilePath()));
}
}
}
}

@Override
public void close() throws Exception {
mScheduleCleanExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;

Expand Down Expand Up @@ -145,6 +146,18 @@ public void testGetCoordinator() throws Exception {
((Queue) AlluxioMockUtil.getInternalState(mTracker, "mFinished")).size());
}

@Test
public void removeExpiredJobsInfo() throws Exception {
List<Long> jobIdList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
jobIdList.add(addJob(100));
}
mTracker.removeJobs(jobIdList);
for (Long jobId : jobIdList) {
assertNull("job id should not exist", mTracker.getCoordinator(jobId));
}
}

@Test
public void testDuplicateSetReplicaJobs() throws Exception {
long jobId = mJobIdGenerator.getNewJobId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import alluxio.job.wire.JobSource;
import alluxio.job.wire.SimpleJobStatusBlock;
import alluxio.job.wire.Status;
import alluxio.master.job.JobMaster;
import alluxio.master.job.common.CmdInfo;
import alluxio.master.job.plan.PlanTracker;
import alluxio.master.job.workflow.WorkflowTracker;

import com.beust.jcommander.internal.Lists;
import org.junit.Assert;
Expand All @@ -46,6 +49,7 @@
public final class CmdJobTrackerTest {
private static final int REPEATED_ATTEMPT_COUNT = 5;
private static final int ONE_ATTEMPT = 1;
private static final long CAPACITY = 100;

private CmdJobTracker mCmdJobTracker;
private FileSystem mFs;
Expand All @@ -55,25 +59,32 @@ public final class CmdJobTrackerTest {
private MigrateCliRunner mMigrateCliRunner;
private DistLoadCliRunner mDistLoadRunner;
private PersistRunner mPersistRunner;

private PlanTracker mPlanTracker;
private LoadCliConfig mLoad;
private MigrateCliConfig mMigrate;
private List<Status> mSearchingCriteria = Lists.newArrayList();
private WorkflowTracker mWorkflowTracker;
private JobMaster mMockJobMaster;

private Long mRetentionTime;

@Rule
public ExpectedException mException = ExpectedException.none();

@Before
public void before() throws Exception {
mFs = mock(FileSystem.class);
mRetentionTime = 1000L;
FileSystemContext fsCtx = mock(FileSystemContext.class);

mMigrateCliRunner = mock(MigrateCliRunner.class);
mDistLoadRunner = mock(DistLoadCliRunner.class);
mPersistRunner = mock(PersistRunner.class);

mPlanTracker = mock(PlanTracker.class);
mMockJobMaster = mock(JobMaster.class);
mWorkflowTracker = new WorkflowTracker(mMockJobMaster);
mCmdJobTracker = new CmdJobTracker(fsCtx,
mDistLoadRunner, mMigrateCliRunner, mPersistRunner);
mDistLoadRunner, mMigrateCliRunner, mPersistRunner, mRetentionTime, mPlanTracker);

mLoad = new LoadCliConfig("/path/to/load", 3, 1, Collections.EMPTY_SET,
Collections.EMPTY_SET, Collections.EMPTY_SET, Collections.EMPTY_SET, true);
Expand All @@ -97,6 +108,33 @@ public void runDistLoadBatchCompleteTest() throws Exception {
Assert.assertEquals(s, Status.COMPLETED);
}

@Test
public void runCleanExpiredJobsTest() throws Exception {
generateLoadCommandForStatus(Status.CANCELED);
generateLoadCommandForStatus(Status.RUNNING);
generateLoadCommandForStatus(Status.FAILED);
generateLoadCommandForStatus(Status.COMPLETED);
generateLoadCommandForStatus(Status.CREATED);
Thread.sleep(70000L);
// the expired job has been cleaned in mInfoMap
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.CANCELED);
Set<Long> cancelCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(0, cancelCmdIds.size());
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.COMPLETED);
Set<Long> completedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(0, completedCmdIds.size());
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.FAILED);
Set<Long> failedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(0, failedCmdIds.size());
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.RUNNING);
Set<Long> runningCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(2, runningCmdIds.size());
}

@Test
public void runDistLoadBatchFailTest() throws Exception {
CmdInfo cmdInfo = new CmdInfo(mLoadJobId, OperationType.DIST_LOAD,
Expand Down Expand Up @@ -316,7 +354,7 @@ public void testGetCmdStatusBlock() throws Exception {

// Below are all help functions.
private void prepareDistLoadTest(
CmdInfo cmdInfo, LoadCliConfig loadCliConfig, long loadId) throws Exception {
CmdInfo cmdInfo, LoadCliConfig loadCliConfig, long loadId) throws Exception {
AlluxioURI filePath = new AlluxioURI(loadCliConfig.getFilePath());
int replication = loadCliConfig.getReplication();
Set<String> workerSet = loadCliConfig.getWorkerSet();
Expand All @@ -325,7 +363,7 @@ private void prepareDistLoadTest(
Set<String> excludedLocalityIds = loadCliConfig.getExcludedLocalityIds();
boolean directCache = loadCliConfig.getDirectCache();
int batch = loadCliConfig.getBatchSize();

// Mock the behavior of runDistLoad
when(mDistLoadRunner.runDistLoad(batch, filePath, replication, workerSet,
excludedWorkerSet, localityIds, excludedLocalityIds, directCache, loadId))
.thenReturn(cmdInfo);
Expand Down

0 comments on commit ffa4bc7

Please sign in to comment.