Skip to content

Commit

Permalink
Add new workflow info fields (#1853)
Browse files Browse the repository at this point in the history
Add new workflow info fields
  • Loading branch information
Quinn-With-Two-Ns authored Aug 30, 2023
1 parent 4298dad commit ac474fa
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,18 @@ boolean getVersion(
*/
long getCurrentWorkflowTaskStartedEventId();

/**
* @return size of Workflow history in bytes up until the current moment of execution. This value
* changes during the lifetime of a Workflow Execution.
*/
long getHistorySize();

/**
* @return true if the server is configured to suggest continue as new and it is suggested. This
* value changes during the lifetime of a Workflow Execution.
*/
boolean isContinueAsNewSuggested();

/**
* @return true if cancellation of the workflow is requested.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,16 @@ public long getCurrentWorkflowTaskStartedEventId() {
return workflowStateMachines.getCurrentStartedEventId();
}

@Override
public long getHistorySize() {
return workflowStateMachines.getHistorySize();
}

@Override
public boolean isContinueAsNewSuggested() {
return workflowStateMachines.isContinueAsNewSuggested();
}

/*
* MUTABLE STATE OPERATIONS
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ enum HandleEventStatus {
/** EventId of the last WorkflowTaskStarted event handled by these state machines. */
private long currentStartedEventId;

private long historySize;

private boolean isContinueAsNewSuggested;

/**
* EventId of the last event seen by these state machines. Events earlier than this one will be
* discarded.
Expand Down Expand Up @@ -205,6 +209,14 @@ public long getCurrentStartedEventId() {
return currentStartedEventId;
}

public long getHistorySize() {
return historySize;
}

public boolean isContinueAsNewSuggested() {
return isContinueAsNewSuggested;
}

public void setReplaying(boolean replaying) {
this.replaying = replaying;
}
Expand Down Expand Up @@ -1126,7 +1138,11 @@ private void assertMatch(
private class WorkflowTaskCommandsListener implements WorkflowTaskStateMachine.Listener {
@Override
public void workflowTaskStarted(
long startedEventId, long currentTimeMillis, boolean nonProcessedWorkflowTask) {
long startedEventId,
long currentTimeMillis,
boolean nonProcessedWorkflowTask,
long historySize,
boolean isContinueAsNewSuggested) {
setCurrentTimeMillis(currentTimeMillis);
for (CancellableCommand cancellableCommand : commands) {
cancellableCommand.handleWorkflowTaskStarted();
Expand All @@ -1140,6 +1156,8 @@ public void workflowTaskStarted(
}
}
WorkflowStateMachines.this.currentStartedEventId = startedEventId;
WorkflowStateMachines.this.historySize = historySize;
WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;

eventLoop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,15 @@ public interface Listener {
* workflow execution this is the last event in the history. During replay (due to query for
* example) the last workflow task still can return false if it is followed by other events
* like WorkflowExecutionCompleted.
* @param historySize current size, in bytes, of the workflow history.
* @param isContinueAsNewSuggested true if the server suggests this workflow to continue as new.
*/
void workflowTaskStarted(
long startEventId, long currentTimeMillis, boolean nonProcessedWorkflowTask);
long startEventId,
long currentTimeMillis,
boolean nonProcessedWorkflowTask,
long historySize,
boolean isContinueAsNewSuggested);

void updateRunId(String currentRunId);
}
Expand All @@ -58,6 +64,8 @@ void workflowTaskStarted(
// TODO write a comment describing the difference between workflowTaskStartedEventId and
// startedEventId
private long startedEventId;
private long historySize;
private boolean isContinueAsNewSuggested;

public static WorkflowTaskStateMachine newInstance(
long workflowTaskStartedEventId, Listener listener) {
Expand Down Expand Up @@ -112,6 +120,10 @@ enum State {
private void handleStarted() {
eventTimeOfTheLastWorkflowStartTask = Timestamps.toMillis(currentEvent.getEventTime());
startedEventId = currentEvent.getEventId();
historySize = currentEvent.getWorkflowTaskStartedEventAttributes().getHistorySizeBytes();
isContinueAsNewSuggested =
currentEvent.getWorkflowTaskStartedEventAttributes().getSuggestContinueAsNew();

// The last started event in the history. So no completed is expected.
if (currentEvent.getEventId() >= workflowTaskStartedEventId && !hasNextEvent) {
handleCompleted();
Expand All @@ -125,7 +137,11 @@ private void handleCompleted() {
// If the workflow task has FAILED or other unsuccessful finish event, we don't replay such
// workflow tasks
listener.workflowTaskStarted(
startedEventId, eventTimeOfTheLastWorkflowStartTask, lastTaskInHistory);
startedEventId,
eventTimeOfTheLastWorkflowStartTask,
lastTaskInHistory,
historySize,
isContinueAsNewSuggested);
}

private void handleFailed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ public long getHistoryLength() {
return context.getCurrentWorkflowTaskStartedEventId();
}

@Override
public long getHistorySize() {
return context.getHistorySize();
}

@Override
public boolean isContinueAsNewSuggested() {
return context.isContinueAsNewSuggested();
}

@Override
public String toString() {
return "WorkflowInfo{"
Expand Down
12 changes: 12 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,16 @@ public interface WorkflowInfo {
* call {@link Workflow#continueAsNew(Object...)}.
*/
long getHistoryLength();

/**
* @return size of Workflow history in bytes up until the current moment of execution. This value
* changes during the lifetime of a Workflow Execution.
*/
long getHistorySize();

/**
* @return true if the server is configured to suggest continue as new and it is suggested. This
* value changes during the lifetime of a Workflow Execution.
*/
boolean isContinueAsNewSuggested();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow;

import static org.junit.Assert.assertEquals;

import io.temporal.activity.LocalActivityOptions;
import io.temporal.testing.WorkflowReplayer;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;

public class GetHistorySizeTest {
private static final TestActivities.VariousTestActivities activitiesImpl =
new TestActivities.TestActivitiesImpl();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflowImpl.class)
.setActivityImplementations(activitiesImpl)
.build();

@Test
public void replay() throws Exception {
WorkflowReplayer.replayWorkflowExecutionFromResource(
"testGetHistorySize.json", TestWorkflowImpl.class);
}

public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {

@Override
public String execute() {
LocalActivityOptions options =
LocalActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(30))
.build();

TestActivities.VariousTestActivities activities =
Workflow.newLocalActivityStub(TestActivities.VariousTestActivities.class, options);

assertEquals(408, Workflow.getInfo().getHistorySize());
assertEquals(false, Workflow.getInfo().isContinueAsNewSuggested());

// Force WFT heartbeat
activities.sleepActivity(TimeUnit.SECONDS.toMillis(10), 1);

assertEquals(897, Workflow.getInfo().getHistorySize());
assertEquals(true, Workflow.getInfo().isContinueAsNewSuggested());

return "done";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
/** Typed attribute translation of {@link SearchAttributesTest} */
public class TypedSearchAttributesTest {
private static final SearchAttributeKey<List<String>> TEST_NEW_KEY =
SearchAttributeKey.forKeywordList("NewKey");
SearchAttributeKey.forKeywordList("NewKeyList");
private static final List<String> TEST_NEW_VALUE = Arrays.asList("foo", "bar");
private static final SearchAttributeKey<String> TEST_UNKNOWN_KEY =
SearchAttributeKey.forText("UnknownKey");
Expand Down
Loading

0 comments on commit ac474fa

Please sign in to comment.