Skip to content

Commit

Permalink
[GOBBLIN-2171] Streamline log messages for easier debugging of GaaS m…
Browse files Browse the repository at this point in the history
…ulti-active DAG processing (#4074)
  • Loading branch information
phet authored Nov 13, 2024
1 parent edaf474 commit 13a6926
Show file tree
Hide file tree
Showing 17 changed files with 175 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.linkedin.restli.server.RestLiServiceException;

/**
* Exception thrown by {@link FlowConfigsResourceHandler} when it cannot handle Restli gracefully.
* Exception thrown by {@link FlowConfigsResourceHandlerInterface} when it cannot handle Restli gracefully.
*/
public class FlowConfigLoggedException extends RestLiServiceException {
private static final Logger log = LoggerFactory.getLogger(FlowConfigLoggedException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public List<FlowConfig> getAll(@Context PagingContext pagingContext) {

/**
* Get all {@link FlowConfig}s that matches the provided parameters. All the parameters are optional.
* If a parameter is null, it is ignored. {@see FlowConfigV2Resource#getFilteredFlows}
* If a parameter is null, it is ignored.
*/
@Finder("filterFlows")
public List<FlowConfig> getFilteredFlows(@Context PagingContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package org.apache.gobblin.service.modules.flow;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;

public class FlowUtils {
/**
* A FlowSpec contains a FlowExecutionId if it is a runOnce flow.
* Refer {@link FlowConfigResourceLocalHandler#createFlowSpecForConfig} for details.
* Refer {@link org.apache.gobblin.service.modules.restli.FlowConfigsV2ResourceHandler#createFlowSpecForConfig} for details.
* @param spec flow spec
* @return flow execution id
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,13 @@ public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long remind
boolean isDeadlineReminder) throws SchedulerException {
DagActionStore.DagAction dagAction = leaseParams.getDagAction();
JobDetail jobDetail = createReminderJobDetail(leaseParams, isDeadlineReminder);
Trigger trigger = createReminderJobTrigger(leaseParams, reminderDurationMillis,
System::currentTimeMillis, isDeadlineReminder);
log.info("Going to set reminder for dagAction {} to fire after {} ms, isDeadlineTrigger: {}",
dagAction, reminderDurationMillis, isDeadlineReminder);
Trigger trigger = createReminderJobTrigger(leaseParams, reminderDurationMillis, System::currentTimeMillis, isDeadlineReminder);
log.info("Setting reminder for {} in {} ms, isDeadlineTrigger: {}", dagAction, reminderDurationMillis, isDeadlineReminder);
quartzScheduler.scheduleJob(jobDetail, trigger);
}

public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams, boolean isDeadlineTrigger) throws SchedulerException {
log.info("Reminder unset for LeaseParams {}, isDeadlineTrigger: {}", leaseParams, isDeadlineTrigger);
log.info("Unsetting reminder for {}, isDeadlineTrigger: {}", leaseParams, isDeadlineTrigger);
if (!quartzScheduler.deleteJob(createJobKey(leaseParams, isDeadlineTrigger))) {
log.warn("Reminder not found for {}. Possibly the event is received out-of-order.", leaseParams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public DagManagementTaskStreamImpl(Config config,

@Override
public synchronized void addDagAction(DagActionStore.LeaseParams leaseParams) {
log.info("Adding {} to queue...", leaseParams);
log.info("Enqueuing {}", leaseParams);
if (!this.leaseParamsQueue.offer(leaseParams)) {
throw new RuntimeException(String.format("Could not add %s to the queue", leaseParams));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,16 @@ public void run() {
DagTask dagTask = dagTaskStream.next(); // blocking call
if (dagTask == null) {
//todo - add a metrics to count the times dagTask was null
log.warn("Received a null dag task, ignoring.");
log.warn("Ignoring null dag task!");
continue;
}
DagProc<?> dagProc = dagTask.host(dagProcFactory);
try {
dagProc.process(dagManagementStateStore, dagProcEngineMetrics);
dagTask.conclude();
log.info("Concluded dagTask : {}", dagTask);
log.info(dagProc.contextualizeStatus("concluded dagTask"));
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while processing dag " + dagProc.getDagId(), e);
log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ public void handleFlowKillTriggerEvent(Properties jobProps, DagActionStore.Lease

private void handleFlowTriggerEvent(Properties jobProps, DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId)
throws IOException {
long previousEventTimeMillis = leaseParams.getEventTimeMillis();
log.info("Handling trigger {} (adoptConsensusTimestamp: {})", leaseParams, adoptConsensusFlowExecutionId);
long origEventTimeMillis = leaseParams.getEventTimeMillis();
LeaseAttemptStatus leaseAttempt = this.multiActiveLeaseArbiter.tryAcquireLease(leaseParams, adoptConsensusFlowExecutionId);
if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
&& persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseAttempt.getConsensusDagAction(),
previousEventTimeMillis);
&& persistDagAction((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt)) {
log.info("Successfully persisted (origEventTimestamp: {}) {}", origEventTimeMillis, leaseAttempt.getConsensusLeaseParams());
} else { // when NOT successfully `persistDagAction`, set a reminder to re-attempt handling (unless leasing finished)
calcLeasedToAnotherStatusForReminder(leaseAttempt).ifPresent(leasedToAnother ->
scheduleReminderForEvent(jobProps, leasedToAnother, previousEventTimeMillis));
scheduleReminderForEvent(jobProps, leasedToAnother, origEventTimeMillis));
}
}

Expand All @@ -146,11 +146,13 @@ private Optional<LeaseAttemptStatus.LeasedToAnotherStatus> calcLeasedToAnotherSt
/**
* Called after obtaining a lease to both persist to the {@link DagActionStore} and
* {@link MultiActiveLeaseArbiter#recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus)}
*
* Presently used for both `LAUNCH` and `KILL` `DagAction`s
*/
private boolean persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
DagActionStore.DagAction launchDagAction = leaseStatus.getConsensusDagAction();
private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
DagActionStore.DagAction dagAction = leaseStatus.getConsensusLeaseParams().getDagAction();
try {
this.dagManagementStateStore.addDagAction(launchDagAction);
this.dagManagementStateStore.addDagAction(dagAction);
this.numFlowsSubmitted.mark();
// after successfully persisting, close the lease
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
Expand All @@ -165,25 +167,24 @@ private boolean persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus le
* @param jobProps
* @param status used to extract event to be reminded for (stored in `consensusDagAction`) and the minimum time after
* which reminder should occur
* @param triggerEventTimeMillis the event timestamp we were originally handling (only used for logging purposes)
* @param origEventTimeMillis the event timestamp we were originally handling (only used for logging)
*/
private void scheduleReminderForEvent(Properties jobProps, LeaseAttemptStatus.LeasedToAnotherStatus status,
long triggerEventTimeMillis) {
DagActionStore.DagAction consensusDagAction = status.getConsensusDagAction();
private void scheduleReminderForEvent(Properties jobProps, LeaseAttemptStatus.LeasedToAnotherStatus status, long origEventTimeMillis) {
DagActionStore.LeaseParams consensusLeaseParams = status.getConsensusLeaseParams();
JobKey origJobKey = new JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job group>>"));
try {
if (!this.schedulerService.getScheduler().checkExists(origJobKey)) {
log.warn("Skipping setting a reminder for a job that does not exist in the scheduler. Key: {}", origJobKey);
log.warn("Skipping flow launch reminder for unknown job ({}; origEventTimestamp: {}) {}", origJobKey, origEventTimeMillis, consensusLeaseParams);
this.jobDoesNotExistInSchedulerCount.inc();
return;
}
Trigger reminderTrigger = createAndScheduleReminder(origJobKey, status, status.getEventTimeMillis());
log.info("Flow Launch Handler - [{}, eventTimestamp: {}] - SCHEDULED REMINDER for event {} in {} millis",
consensusDagAction, triggerEventTimeMillis, status.getEventTimeMillis(), reminderTrigger.getNextFireTime());
Trigger reminderTrigger = createAndScheduleReminder(origJobKey, status);
log.info("Scheduled flow launch reminder for job ({}; origEventTimestamp: {}) {} at {}", origJobKey, origEventTimeMillis, consensusLeaseParams,
reminderTrigger.getNextFireTime());
} catch (SchedulerException e) {
log.warn("Failed to add job reminder due to SchedulerException for job {} trigger event {}. Exception: {}",
origJobKey, status.getEventTimeMillis(), e);
log.warn(String.format("Failed to schedule flow launch reminder for job (%s; origEventTimestamp %s) %s", origJobKey, origEventTimeMillis,
consensusLeaseParams), e);
this.failedToSetEventReminderCount.inc();
}
}
Expand All @@ -194,12 +195,11 @@ private void scheduleReminderForEvent(Properties jobProps, LeaseAttemptStatus.Le
* triggerEventTimeMillis to revisit upon firing.
* @param origJobKey
* @param status
* @param triggerEventTimeMillis
* @return Trigger for reminder
* @throws SchedulerException
*/
protected Trigger createAndScheduleReminder(JobKey origJobKey, LeaseAttemptStatus.LeasedToAnotherStatus status,
long triggerEventTimeMillis) throws SchedulerException {
protected Trigger createAndScheduleReminder(JobKey origJobKey, LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
// Generate a suffix to differentiate the reminder Job and Trigger from the original JobKey and Trigger, so we can
// allow us to keep track of additional properties needed for reminder events (all triggers associated with one job
// refer to the same set of jobProperties)
Expand All @@ -209,9 +209,7 @@ protected Trigger createAndScheduleReminder(JobKey origJobKey, LeaseAttemptStatu
jobDetail.setKey(reminderJobKey);
Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey, getJobPropertiesFromJobDetail(jobDetail),
Optional.of(reminderSuffix));
log.debug("Flow Launch Handler - [{}, eventTimestamp: {}] - attempting to schedule reminder for event {} with "
+ "reminderJobKey {} and reminderTriggerKey {}", status.getConsensusDagAction(), triggerEventTimeMillis,
status.getEventTimeMillis(), reminderJobKey, reminderTrigger.getKey());
log.debug("Scheduling flow launch reminder for job ({}; renamed: {}) {}", origJobKey, reminderJobKey, status.getConsensusLeaseParams());
this.schedulerService.getScheduler().scheduleJob(jobDetail, reminderTrigger);
return reminderTrigger;
}
Expand All @@ -224,7 +222,7 @@ protected Trigger createAndScheduleReminder(JobKey origJobKey, LeaseAttemptStatu
*/
@VisibleForTesting
public static String createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus) {
return "reminder_for_" + leasedToAnotherStatus.getEventTimeMillis();
return "reminder_for_" + leasedToAnotherStatus.getConsensusLeaseParams().getEventTimeMillis();
}

/**
Expand Down Expand Up @@ -279,7 +277,7 @@ public static JobDataMap cloneAndUpdateJobProperties(JobDataMap jobDataMap,
// can differ between participants and be interpreted as a reminder for a distinct flow trigger which will cause
// excess flows to be triggered by the reminder functionality.
newJobProperties.put(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
String.valueOf(leasedToAnotherStatus.getConsensusLeaseParams().getEventTimeMillis()));
// Use this boolean to indicate whether this is a reminder event
newJobProperties.put(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, String.valueOf(true));
// Replace reference to old Properties map with new cloned Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@ private void initializeMetrics(String metricsPrefix) {

@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean skipFlowExecutionIdReplacement) throws IOException {
LeaseAttemptStatus leaseAttemptStatus =
decoratedMultiActiveLeaseArbiter.tryAcquireLease(leaseParams, skipFlowExecutionIdReplacement);
log.info("Multi-active arbiter attempt for: {} received type of leaseAttemptStatus: [{}, "
+ "eventTimestamp: {}] ", leaseParams, leaseAttemptStatus.getClass().getName(),
leaseParams.getEventTimeMillis());
LeaseAttemptStatus leaseAttemptStatus = decoratedMultiActiveLeaseArbiter.tryAcquireLease(leaseParams, skipFlowExecutionIdReplacement);
if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
if (leaseParams.isReminder()) {
this.leasesObtainedDueToReminderCount.mark();
Expand All @@ -91,8 +87,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams
this.noLongerLeasingStatusCount.inc();
return leaseAttemptStatus;
}
throw new RuntimeException(String.format("Received type of leaseAttemptStatus: %s not handled by this method",
leaseAttemptStatus.getClass().getName()));
throw new RuntimeException(String.format("Unexpected LeaseAttemptStatus (%s) for %s", leaseAttemptStatus.getClass().getName(), leaseParams));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;

import com.google.common.annotations.VisibleForTesting;
import lombok.AccessLevel;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand All @@ -29,26 +30,19 @@
* Hierarchy to convey the specific outcome of attempted lease acquisition via the {@link MultiActiveLeaseArbiter},
* with each derived type carrying outcome-specific status info.
*
* IMPL. NOTE: {@link LeaseAttemptStatus#getConsensusDagAction} and {@link LeaseAttemptStatus#getMinimumLingerDurationMillis}
* IMPL. NOTE: {@link LeaseAttemptStatus#getConsensusLeaseParams()} and {@link LeaseAttemptStatus#getMinimumLingerDurationMillis}
* intended for `@Override`.
*/
public abstract class LeaseAttemptStatus {
/**
* @return the {@link DagActionStore.LeaseParams}, containing the dagAction, eventTimeMillis of the event, and boolean
* indicating if it's a reminder event; {@see MultiActiveLeaseArbiter#tryAcquireLease}
* @return the {@link DagActionStore.LeaseParams} containing the {@link DagActionStore.DagAction}, which may now have an updated
* flowExecutionId that MUST henceforth be used
* @see MultiActiveLeaseArbiter#tryAcquireLease
*/
public DagActionStore.LeaseParams getConsensusLeaseParams() {
return null;
}

/**
* @return the {@link DagActionStore.DagAction}, which may now have an updated flowExecutionId that MUST henceforth be
* used; {@see MultiActiveLeaseArbiter#tryAcquireLease}
*/
public DagActionStore.DagAction getConsensusDagAction() {
return null;
}

public long getMinimumLingerDurationMillis() {
return 0;
}
Expand Down Expand Up @@ -79,11 +73,6 @@ public static class LeaseObtainedStatus extends LeaseAttemptStatus {
@Getter(AccessLevel.NONE)
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;

@Override
public DagActionStore.DagAction getConsensusDagAction() {
return consensusLeaseParams.getDagAction();
}

/**
* Completes the lease referenced by this status object if it has not expired.
* @return true if able to complete lease, false otherwise.
Expand All @@ -93,7 +82,8 @@ public boolean completeLease() throws IOException {
return multiActiveLeaseArbiter.recordLeaseSuccess(this);
}

public long getEventTimeMillis() {
@VisibleForTesting
protected long getEventTimeMillis() {
return consensusLeaseParams.getEventTimeMillis();
}
}
Expand All @@ -112,12 +102,8 @@ public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
private final DagActionStore.LeaseParams consensusLeaseParams;
private final long minimumLingerDurationMillis;

@Override
public DagActionStore.DagAction getConsensusDagAction() {
return consensusLeaseParams.getDagAction();
}

public long getEventTimeMillis() {
@VisibleForTesting
protected long getEventTimeMillis() {
return consensusLeaseParams.getEventTimeMillis();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public interface MultiActiveLeaseArbiter {
*
* @param leaseParams uniquely identifies the flow, the present action upon it, the time the action
* was triggered, and if the dag action event we're checking on is a reminder event
* @param adoptConsensusFlowExecutionId if true then replaces the dagAction flowExecutionId returned in
* LeaseAttemptStatuses with the consensus eventTime, accessed via
* {@link LeaseAttemptStatus#getConsensusDagAction()}
* @param adoptConsensusFlowExecutionId if true then replaces the dagAction flowExecutionId returned as a {@link LeaseAttemptStatus}
* with the consensus flowExecutionId (also the eventTime) accessible via
* {@link LeaseAttemptStatus#getConsensusLeaseParams()}
* @return {@link LeaseAttemptStatus}, containing, when `adoptConsensusFlowExecutionId`, a universally-agreed-upon
* {@link DagActionStore.DagAction} with a possibly updated ("laundered") flow execution id that MUST be used thereafter
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void addJobDagAction(String flowGroup, String flowName, long flowExecutio

@Override
public boolean deleteDagAction(DagActionStore.DagAction dagAction) throws IOException {
log.info("Deleting Dag Action {}", dagAction);
log.info("Deleting {}", dagAction);
return this.dagActionStore.deleteDagAction(dagAction);
}

Expand Down
Loading

0 comments on commit 13a6926

Please sign in to comment.