diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java index 13ecfd1ebc..c358b368a3 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; @@ -58,6 +59,7 @@ import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.ParallelRunner; import org.apache.gobblin.temporal.ddm.util.JobStateUtils; import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; @@ -128,7 +130,8 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir, this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.eventSubmitter, this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository()); - this.executor = Executors.newSingleThreadExecutor(); + this.executor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("GobblinJobLauncher"))); } @Override @@ -180,6 +183,8 @@ protected void runWorkUnits(List workUnits) throws Exception { waitJob(submitJobFuture); log.info(String.format("Job %s completed", this.jobContext.getJobId())); } finally { + ExecutorsUtils.shutdownExecutorService(executor, Optional.of(log)); + // The last iteration of output TaskState collecting will run when the collector service gets stopped this.taskStateCollectorService.stopAsync().awaitTerminated(); cleanupWorkingDirectory(); diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java index 031e295aa0..d2f2180fff 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java @@ -643,6 +643,12 @@ ApplicationId setupAndSubmitApplication() throws IOException, YarnException, Int LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl()); LOGGER.info("Application User: " + applicationReport.getUser() + " Queue: " + applicationReport.getQueue()); + // Temporal workflow tracking url + String temporalWorkflowTrackingUrl = ConfigUtils.getString(config, "gobblin.temporal.ui.server.url", ""); + if (StringUtils.isNotBlank(temporalWorkflowTrackingUrl)) { + LOGGER.info("Temporal Workflow Tracking URL: " + temporalWorkflowTrackingUrl); + } + return applicationId; }