Skip to content

Commit

Permalink
[GOBBLIN-2191] Shutdown GobblinJobLauncher executor (#4094)
Browse files Browse the repository at this point in the history
Shutdown GobblinJobLauncher executor to fix AM graceful shutdown
  • Loading branch information
abhishekmjain authored Jan 29, 2025
1 parent e584e5b commit 09a3e21
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.fs.Path;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
Expand All @@ -58,6 +59,7 @@
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
Expand Down Expand Up @@ -128,7 +130,8 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
this.taskStateCollectorService =
new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.eventSubmitter,
this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository());
this.executor = Executors.newSingleThreadExecutor();
this.executor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("GobblinJobLauncher")));
}

@Override
Expand Down Expand Up @@ -180,6 +183,8 @@ protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
waitJob(submitJobFuture);
log.info(String.format("Job %s completed", this.jobContext.getJobId()));
} finally {
ExecutorsUtils.shutdownExecutorService(executor, Optional.of(log));

// The last iteration of output TaskState collecting will run when the collector service gets stopped
this.taskStateCollectorService.stopAsync().awaitTerminated();
cleanupWorkingDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,12 @@ ApplicationId setupAndSubmitApplication() throws IOException, YarnException, Int
LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
LOGGER.info("Application User: " + applicationReport.getUser() + " Queue: " + applicationReport.getQueue());

// Temporal workflow tracking url
String temporalWorkflowTrackingUrl = ConfigUtils.getString(config, "gobblin.temporal.ui.server.url", "");
if (StringUtils.isNotBlank(temporalWorkflowTrackingUrl)) {
LOGGER.info("Temporal Workflow Tracking URL: " + temporalWorkflowTrackingUrl);
}

return applicationId;
}

Expand Down

0 comments on commit 09a3e21

Please sign in to comment.