Skip to content

Commit

Permalink
Ad support for local activity metadata (#2309)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Nov 12, 2024
1 parent c8a27ce commit 24990db
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.activity;

import com.google.common.base.Objects;
import io.temporal.common.Experimental;
import io.temporal.common.MethodRetry;
import io.temporal.common.RetryOptions;
import java.time.Duration;
Expand Down Expand Up @@ -56,6 +57,7 @@ public static final class Builder {
private Duration localRetryThreshold;
private RetryOptions retryOptions;
private Boolean doNotIncludeArgumentsIntoMarker;
private String summary;

/** Copy Builder fields from the options. */
private Builder(LocalActivityOptions options) {
Expand All @@ -68,6 +70,7 @@ private Builder(LocalActivityOptions options) {
this.localRetryThreshold = options.getLocalRetryThreshold();
this.retryOptions = options.getRetryOptions();
this.doNotIncludeArgumentsIntoMarker = options.isDoNotIncludeArgumentsIntoMarker();
this.summary = options.getSummary();
}

/**
Expand Down Expand Up @@ -178,6 +181,18 @@ public Builder setDoNotIncludeArgumentsIntoMarker(boolean doNotIncludeArgumentsI
return this;
}

/**
* Single-line fixed summary for this activity that will appear in UI/CLI. This can be in
* single-line Temporal Markdown format.
*
* <p>Default is none/empty.
*/
@Experimental
public Builder setSummary(String summary) {
this.summary = summary;
return this;
}

public Builder mergeActivityOptions(LocalActivityOptions override) {
if (override == null) {
return this;
Expand All @@ -204,6 +219,7 @@ public Builder mergeActivityOptions(LocalActivityOptions override) {
(override.doNotIncludeArgumentsIntoMarker != null)
? override.doNotIncludeArgumentsIntoMarker
: this.doNotIncludeArgumentsIntoMarker;
this.summary = (override.summary == null) ? this.summary : override.summary;
return this;
}

Expand All @@ -214,7 +230,8 @@ public LocalActivityOptions build() {
scheduleToStartTimeout,
localRetryThreshold,
retryOptions,
doNotIncludeArgumentsIntoMarker);
doNotIncludeArgumentsIntoMarker,
summary);
}

public LocalActivityOptions validateAndBuildWithDefaults() {
Expand All @@ -228,7 +245,8 @@ public LocalActivityOptions validateAndBuildWithDefaults() {
scheduleToStartTimeout,
localRetryThreshold,
RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults(),
doNotIncludeArgumentsIntoMarker);
doNotIncludeArgumentsIntoMarker,
summary);
}
}

Expand All @@ -238,20 +256,23 @@ public LocalActivityOptions validateAndBuildWithDefaults() {
private final Duration scheduleToStartTimeout;
private final RetryOptions retryOptions;
private final Boolean doNotIncludeArgumentsIntoMarker;
private final String summary;

private LocalActivityOptions(
Duration startToCloseTimeout,
Duration scheduleToCloseTimeout,
Duration scheduleToStartTimeout,
Duration localRetryThreshold,
RetryOptions retryOptions,
Boolean doNotIncludeArgumentsIntoMarker) {
Boolean doNotIncludeArgumentsIntoMarker,
String summary) {
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
this.startToCloseTimeout = startToCloseTimeout;
this.scheduleToStartTimeout = scheduleToStartTimeout;
this.localRetryThreshold = localRetryThreshold;
this.retryOptions = retryOptions;
this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker;
this.summary = summary;
}

public Duration getScheduleToCloseTimeout() {
Expand All @@ -278,6 +299,11 @@ public boolean isDoNotIncludeArgumentsIntoMarker() {
return doNotIncludeArgumentsIntoMarker != null && doNotIncludeArgumentsIntoMarker;
}

@Experimental
public String getSummary() {
return summary;
}

public Builder toBuilder() {
return new Builder(this);
}
Expand All @@ -292,7 +318,8 @@ public boolean equals(Object o) {
&& Objects.equal(startToCloseTimeout, that.startToCloseTimeout)
&& Objects.equal(scheduleToStartTimeout, that.scheduleToStartTimeout)
&& Objects.equal(localRetryThreshold, that.localRetryThreshold)
&& Objects.equal(retryOptions, that.retryOptions);
&& Objects.equal(retryOptions, that.retryOptions)
&& Objects.equal(summary, that.summary);
}

@Override
Expand All @@ -303,7 +330,8 @@ public int hashCode() {
scheduleToStartTimeout,
localRetryThreshold,
retryOptions,
doNotIncludeArgumentsIntoMarker);
doNotIncludeArgumentsIntoMarker,
summary);
}

@Override
Expand All @@ -319,6 +347,8 @@ public String toString() {
+ retryOptions
+ ", doNotIncludeArgumentsIntoMarker="
+ isDoNotIncludeArgumentsIntoMarker()
+ ", summary="
+ summary
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.api.common.v1.ActivityType;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.workflow.Functions;
Expand Down Expand Up @@ -51,21 +52,24 @@ public class ExecuteLocalActivityParameters {
private final boolean doNotIncludeArgumentsIntoMarker;
private final @Nullable Duration scheduleToStartTimeout;
private @Nullable Functions.Proc onNewAttemptCallback;
private final UserMetadata metadata;

public ExecuteLocalActivityParameters(
@Nonnull PollActivityTaskQueueResponse.Builder activityTaskBuilder,
@Nullable Duration scheduleToStartTimeout,
long originalScheduledTimestamp,
@Nullable Failure previousLocalExecutionFailure,
boolean doNotIncludeArgumentsIntoMarker,
@Nonnull Duration localRetryThreshold) {
@Nonnull Duration localRetryThreshold,
UserMetadata metadata) {
this.activityTaskBuilder = Objects.requireNonNull(activityTaskBuilder, "activityTaskBuilder");
this.scheduleToStartTimeout = scheduleToStartTimeout;
this.originalScheduledTimestamp = originalScheduledTimestamp;
this.previousLocalExecutionFailure = previousLocalExecutionFailure;
this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker;
this.localRetryThreshold = localRetryThreshold;
this.onNewAttemptCallback = null;
this.metadata = metadata;
}

public String getActivityId() {
Expand Down Expand Up @@ -136,4 +140,8 @@ public Functions.Proc getOnNewAttemptCallback() {
public void setOnNewAttemptCallback(@Nonnull Functions.Proc onNewAttemptCallback) {
this.onNewAttemptCallback = onNewAttemptCallback;
}

public UserMetadata getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.temporal.api.failure.v1.CanceledFailureInfo;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
import io.temporal.api.sdk.v1.UserMetadata;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
import io.temporal.common.converter.DefaultDataConverter;
Expand Down Expand Up @@ -63,6 +64,7 @@ final class LocalActivityStateMachine
private final LocalActivityCallback callback;

private ExecuteLocalActivityParameters localActivityParameters;
private @Nullable UserMetadata userMetadata;
private final Functions.Func<Boolean> replaying;

/** Accepts proposed current time. Returns accepted current time. */
Expand Down Expand Up @@ -211,6 +213,7 @@ private LocalActivityStateMachine(
this.replaying = replaying;
this.setCurrentTimeCallback = setCurrentTimeCallback;
this.localActivityParameters = localActivityParameters;
this.userMetadata = localActivityParameters.getMetadata();
this.activityId = localActivityParameters.getActivityId();
this.activityType = localActivityParameters.getActivityType();
this.originalScheduledTimestamp = localActivityParameters.getOriginalScheduledTimestamp();
Expand Down Expand Up @@ -342,11 +345,15 @@ private void createMarker() {
DefaultDataConverter.STANDARD_INSTANCE.toPayloads(localActivityMarkerMetadata).get());
markerAttributes.putAllDetails(details);
}
addCommand(
Command.Builder command =
Command.newBuilder()
.setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER)
.setRecordMarkerCommandAttributes(markerAttributes.build())
.build());
.setRecordMarkerCommandAttributes(markerAttributes.build());
if (userMetadata != null) {
command.setUserMetadata(userMetadata);
userMetadata = null;
}
addCommand(command.build());
}

private void notifyResultFromEvent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,13 +680,18 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(3);
}

@Nullable
UserMetadata userMetadata =
makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext);

return new ExecuteLocalActivityParameters(
activityTask,
options.getScheduleToStartTimeout(),
originalScheduledTime,
previousExecutionFailure,
options.isDoNotIncludeArgumentsIntoMarker(),
localRetryThreshold);
localRetryThreshold,
userMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ private ReplayWorkflow createReplayWorkflow(WorkflowExecutionHistory workflowExe
System.currentTimeMillis(),
null,
false,
null,
null),
(r, e) -> {});
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void localActivityMeteringHelper() {
0,
null,
false,
null,
null);
laMeteringHelper.addNewLocalActivity(executeLA);
laMeteringHelper.addNewLocalActivity(
Expand All @@ -160,6 +161,7 @@ public void localActivityMeteringHelper() {
0,
null,
false,
null,
null));
for (int i = 0; i < 5; i++) {
executeLA.getOnNewAttemptCallback().apply();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
System.currentTimeMillis(),
null,
true,
null,
null);
ExecuteLocalActivityParameters parameters2 =
new ExecuteLocalActivityParameters(
Expand All @@ -112,6 +113,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
System.currentTimeMillis(),
null,
false,
null,
null);
ExecuteLocalActivityParameters parameters3 =
new ExecuteLocalActivityParameters(
Expand All @@ -122,6 +124,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
System.currentTimeMillis(),
null,
true,
null,
null);

builder
Expand Down Expand Up @@ -303,6 +306,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
System.currentTimeMillis(),
null,
false,
null,
null);
builder
.<Optional<Payloads>, LocalActivityCallback.LocalActivityFailedException>add2(
Expand Down Expand Up @@ -368,6 +372,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
System.currentTimeMillis(),
null,
false,
null,
null);
// TODO: This is a workaround for the lack of support for child workflow in the test
// framework.
Expand Down
Loading

0 comments on commit 24990db

Please sign in to comment.