Skip to content

Commit

Permalink
Cancel the function immediately when adding a task submission
Browse files Browse the repository at this point in the history
  • Loading branch information
harbby committed Dec 27, 2018
1 parent 1d772ee commit 382a65c
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@

@javax.inject.Singleton
@Path("/job_manger")
public class JobMangerResurce
public class JobManagerResurce
{
private static final Logger logger = LoggerFactory.getLogger(JobMangerResurce.class);
private static final Logger logger = LoggerFactory.getLogger(JobManagerResurce.class);

@Context private ServletContext servletContext;
@Context private UriInfo uriInfo;
private SylphContext sylphContext;

public JobMangerResurce(
public JobManagerResurce(
@Context ServletContext servletContext,
@Context UriInfo uriInfo)
{
Expand Down
82 changes: 32 additions & 50 deletions sylph-main/src/main/java/ideal/sylph/main/service/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static ideal.sylph.spi.exception.StandardErrorCode.ILLEGAL_OPERATION;
import static ideal.sylph.spi.exception.StandardErrorCode.JOB_START_ERROR;
import static ideal.sylph.spi.job.Job.Status.KILLING;
import static ideal.sylph.spi.job.Job.Status.RUNNING;
import static ideal.sylph.spi.job.Job.Status.STARTED_ERROR;
import static ideal.sylph.spi.job.Job.Status.STARTING;
import static ideal.sylph.spi.job.Job.Status.STOP;

/**
* JobManager
Expand All @@ -54,53 +55,35 @@ public final class JobManager
@Autowired private RunnerManager runnerManger;
@Autowired private MetadataManager metadataManager;

private final ConcurrentMap<String, JobContainer> runningContainers = new ConcurrentHashMap<>();

private volatile boolean run;
private final ConcurrentMap<String, JobContainer> containers = new ConcurrentHashMap<>();

/**
* 用来做耗时的->任务启动提交到yarn的操作
* Used to do time-consuming task submit operations
*/
private ExecutorService jobStartPool = Executors.newFixedThreadPool(MaxSubmitJobNum);

private final Thread monitorService = new Thread(() -> {
while (run) {
while (true) {
Thread.currentThread().setName("job_monitor");
runningContainers.forEach((jobId, container) -> {
try {
Job.Status status = container.getStatus();
switch (status) {
case STOP: {
jobStartPool.submit(() -> {
try {
Thread.currentThread().setName("job_submit_" + jobId);
logger.warn("Job {}[{}] Status is {}, Soon to start", jobId,
container.getRunId(), status);
container.setStatus(STARTING);
Optional<String> runId = container.run();
if (container.getStatus() == KILLING) {
container.shutdown();
}
else {
container.setStatus(RUNNING);
runId.ifPresent(result -> metadataManager.addMetadata(jobId, result));
}
}
catch (Exception e) {
container.setStatus(STARTED_ERROR);
logger.warn("job {} start error", jobId, e);
}
}); //需要重启 Job
containers.forEach((jobId, container) -> {
Job.Status status = container.getStatus();
if (status == STOP) {
Future future = jobStartPool.submit(() -> {
try {
Thread.currentThread().setName("job_submit_" + jobId);
logger.warn("Job {}[{}] Status is {}, Soon to start", jobId,
container.getRunId(), status);
container.setStatus(STARTING);
Optional<String> runId = container.run();
container.setStatus(RUNNING);
runId.ifPresent(result -> metadataManager.addMetadata(jobId, result));
}
case RUNNING:
case STARTED_ERROR:
case STARTING:
case KILLING:
default:
}
}
catch (Exception e) {
logger.warn("Check job {} status error", jobId, e);
catch (Exception e) {
container.setStatus(STARTED_ERROR);
logger.warn("job {} start error", jobId, e);
}
});
container.setFuture(future);
}
});

Expand All @@ -118,12 +101,12 @@ public final class JobManager
*/
public synchronized void startJob(String jobId)
{
if (runningContainers.containsKey(jobId)) {
if (containers.containsKey(jobId)) {
throw new SylphException(JOB_START_ERROR, "Job " + jobId + " already started");
}
Job job = this.getJob(jobId).orElseThrow(() -> new SylphException(JOB_START_ERROR, "Job " + jobId + " not found with jobStore"));
runningContainers.computeIfAbsent(jobId, k -> runnerManger.createJobContainer(job, null));
logger.info("runningContainers size:{}", runningContainers.size());
containers.computeIfAbsent(jobId, k -> runnerManger.createJobContainer(job, null));
logger.info("deploy job :{}", jobId);
}

/**
Expand All @@ -132,8 +115,9 @@ public synchronized void startJob(String jobId)
public synchronized void stopJob(String jobId)
throws Exception
{
JobContainer container = runningContainers.remove(jobId);
JobContainer container = containers.remove(jobId);
if (container != null) {
logger.warn("job {} Cancel submission", jobId);
metadataManager.removeMetadata(jobId);
container.shutdown();
}
Expand All @@ -147,7 +131,7 @@ public void saveJob(@NotNull Job job)
public void removeJob(String jobId)
throws IOException
{
if (runningContainers.containsKey(jobId)) {
if (containers.containsKey(jobId)) {
throw new SylphException(ILLEGAL_OPERATION, "Can only delete tasks that have been offline");
}
jobStore.removeJob(jobId);
Expand Down Expand Up @@ -176,15 +160,13 @@ public Collection<Job> listJobs()
public void start()
throws IOException
{
this.run = true;
monitorService.setDaemon(false);
monitorService.start();
//--------- init read metadata job status ---------------
Map<String, String> metadatas = metadataManager.loadMetadata();
metadatas.forEach((jobId, jobInfo) -> this.getJob(jobId).ifPresent(job -> {
JobContainer container = runnerManger.createJobContainer(job, jobInfo);
runningContainers.put(job.getId(), container);
logger.info("runningContainers size:{}", runningContainers.size());
containers.put(job.getId(), container);
}));
}

Expand All @@ -193,15 +175,15 @@ public void start()
*/
public Optional<JobContainer> getJobContainer(@NotNull String jobId)
{
return Optional.ofNullable(runningContainers.get(jobId));
return Optional.ofNullable(containers.get(jobId));
}

/**
* get running JobContainer with this runId(demo: yarnAppId)
*/
public Optional<JobContainer> getJobContainerWithRunId(@NotNull String runId)
{
for (JobContainer container : runningContainers.values()) {
for (JobContainer container : containers.values()) {
if (runId.equals(container.getRunId())) {
return Optional.ofNullable(container);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import ideal.sylph.spi.job.JobContainer;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,20 +64,7 @@ public JobContainer getYarnContainer(Job job, String lastRunid)
{
FlinkYarnJobLauncher jobLauncher = yarnLauncher.get();

JobContainer yarnJobContainer = new YarnJobContainer(jobLauncher.getYarnClient(), lastRunid)
{
@Override
public Optional<String> run()
throws Exception
{
logger.info("Instantiating SylphFlinkJob {} at yarnId {}", job.getId());
this.setYarnAppId(null);
ApplicationId applicationId = jobLauncher.start(job);
this.setYarnAppId(applicationId);
return Optional.of(applicationId.toString());
}
};
return YarnJobContainer.proxy(yarnJobContainer);
return YarnJobContainer.of(jobLauncher.getYarnClient(), lastRunid, () -> jobLauncher.start(job));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
*/
package ideal.sylph.runner.flink.yarn;

import com.github.harbby.gadtry.base.Throwables;
import com.github.harbby.gadtry.ioc.Autowired;
import ideal.sylph.runner.flink.FlinkJobConfig;
import ideal.sylph.runner.flink.FlinkJobHandle;
import ideal.sylph.runner.flink.FlinkRunner;
import ideal.sylph.runner.flink.actuator.JobParameter;
import ideal.sylph.spi.job.Job;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -38,10 +38,12 @@

import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -65,7 +67,7 @@ public YarnClient getYarnClient()
return yarnClient;
}

public ApplicationId start(Job job)
public Optional<ApplicationId> start(Job job)
throws Exception
{
FlinkJobHandle jobHandle = (FlinkJobHandle) job.getJobHandle();
Expand All @@ -81,31 +83,35 @@ public ApplicationId start(Job job)
JobGraph jobGraph = jobHandle.getJobGraph();
//todo: How to use `savepoints` to restore a job
//jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("hdfs:///tmp/sylph/apps/savepoints"));
return start(descriptor, jobGraph).getClusterId();
return start(descriptor, jobGraph);
}

private ClusterClient<ApplicationId> start(YarnClusterDescriptor descriptor, JobGraph job)
private Optional<ApplicationId> start(YarnClusterDescriptor descriptor, JobGraph job)
throws Exception
{
ApplicationId applicationId = null;
try {
ClusterClient<ApplicationId> client = descriptor.deploy(); //create app master
ClusterClient<ApplicationId> client = descriptor.deploy(); //create yarn appMaster
applicationId = client.getClusterId();
ClusterSpecification specification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(1024)
.setNumberTaskManagers(2)
.setSlotsPerTaskManager(2)
.setTaskManagerMemoryMB(1024)
.createClusterSpecification();
client.runDetached(job, null); //submit graph to yarn appMaster 并运行分离
client.runDetached(job, null); //submit graph to appMaster 并分离
stopAfterJob(client, job.getJobID());
return client;
client.shutdown();
return Optional.of(applicationId);
}
catch (Exception e) {
if (applicationId != null) {
yarnClient.killApplication(applicationId);
}
throw e;
Thread thread = Thread.currentThread();
if (e instanceof InterruptedIOException ||
thread.isInterrupted() ||
Throwables.getRootCause(e) instanceof InterruptedException) {
logger.warn("job {} Canceled submission", job.getJobID());
return Optional.empty();
}
else {
throw e;
}
}
finally {
//Clear temporary directory
Expand All @@ -114,6 +120,7 @@ private ClusterClient<ApplicationId> start(YarnClusterDescriptor descriptor, Job
FileSystem hdfs = FileSystem.get(clusterConf.yarnConf());
Path appDir = new Path(clusterConf.appRootDir(), applicationId.toString());
hdfs.delete(appDir, true);
logger.info("clear tmp dir: {}", appDir);
}
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,23 @@ public YarnClient getYarnClient()
}

public ClusterClient<ApplicationId> deploy()
throws Exception
{
try {
YarnClientApplication application = yarnClient.createApplication();
ApplicationReport report = startAppMaster(application);

Configuration flinkConfiguration = getFlinkConfiguration();
flinkConfiguration.setString(JobManagerOptions.ADDRESS.key(), report.getHost());
flinkConfiguration.setInteger(JobManagerOptions.PORT.key(), report.getRpcPort());

flinkConfiguration.setString(RestOptions.ADDRESS, report.getHost());
flinkConfiguration.setInteger(RestOptions.PORT, report.getRpcPort());

//return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()).getMaxSlots();
return new YarnClusterClient(this,
appConf.getTaskManagerCount(),
appConf.getTaskManagerSlots(),
report, clusterConf.flinkConfiguration(), false);
}
catch (Exception e) {
throw new RuntimeException(e);
}
YarnClientApplication application = yarnClient.createApplication();
ApplicationReport report = startAppMaster(application);

Configuration flinkConfiguration = getFlinkConfiguration();
flinkConfiguration.setString(JobManagerOptions.ADDRESS.key(), report.getHost());
flinkConfiguration.setInteger(JobManagerOptions.PORT.key(), report.getRpcPort());

flinkConfiguration.setString(RestOptions.ADDRESS, report.getHost());
flinkConfiguration.setInteger(RestOptions.PORT, report.getRpcPort());

//return new RestClusterClient<>(flinkConfiguration, report.getApplicationId()).getMaxSlots();
return new YarnClusterClient(this,
appConf.getTaskManagerCount(),
appConf.getTaskManagerSlots(),
report, clusterConf.flinkConfiguration(), false);
}

private ApplicationReport startAppMaster(YarnClientApplication application)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
import ideal.sylph.spi.job.ContainerFactory;
import ideal.sylph.spi.job.Job;
import ideal.sylph.spi.job.JobContainer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.StreamingContext;

import java.util.Optional;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -50,20 +48,8 @@ public class SparkContainerFactory
public JobContainer getYarnContainer(Job job, String lastRunid)
{
SparkAppLauncher appLauncher = yarnLauncher.get();
final JobContainer yarnJobContainer = new YarnJobContainer(appLauncher.getYarnClient(), lastRunid)
{
@Override
public Optional<String> run()
throws Exception
{
this.setYarnAppId(null);
ApplicationId yarnAppId = appLauncher.run(job);
this.setYarnAppId(yarnAppId);
return Optional.of(yarnAppId.toString());
}
};
//----create JobContainer Proxy
return YarnJobContainer.proxy(yarnJobContainer);
return YarnJobContainer.of(appLauncher.getYarnClient(), lastRunid, () -> appLauncher.run(job));
}

@Override
Expand Down
Loading

0 comments on commit 382a65c

Please sign in to comment.