Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Reset] Handle completion events from restarted children #7295

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions service/history/api/recordchildworkflowcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package recordchildworkflowcompleted
import (
"context"
"errors"
"fmt"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -62,18 +63,20 @@ func Invoke(

// If the parent is reset, we need to follow a possible chain of resets to deliver the completion event to the correct parent.
redirectCount := 0
isForwarded := false
for {
resetRunID, err := recordChildWorkflowCompleted(ctx, request, shardContext, workflowConsistencyChecker)
resetRunID, err := recordChildWorkflowCompleted(ctx, request, shardContext, workflowConsistencyChecker, isForwarded)
if errors.Is(err, consts.ErrWorkflowCompleted) {
// if the parent was reset, forward the request to the new run pointed by resetRunID
// Note: An alternative solution is to load the current run here ane compare the originalRunIDs of the current run and the closed parent.
// Note: An alternative solution is to load the current run here and compare the originalRunIDs of the current run and the closed parent.
// If they match, then deliver it to the current run. We should consider this optimization if we notice that reset chain is longer than 1-2 hops.
if resetRunID != "" {
if redirectCount >= maxResetRedirectCount {
return nil, consts.ErrResetRedirectLimitReached
}
redirectCount++
request.ParentExecution.RunId = resetRunID
isForwarded = true
continue
}
}
Expand All @@ -91,6 +94,7 @@ func recordChildWorkflowCompleted(
request *historyservice.RecordChildExecutionCompletedRequest,
shardContext shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
isForwarded bool,
) (string, error) {
resetRunID := ""
parentInitiatedID := request.ParentInitiatedId
Expand Down Expand Up @@ -153,6 +157,28 @@ func recordChildWorkflowCompleted(
return nil, consts.ErrChildExecutionNotFound
}

childrenInitializedAfterResetPoint := mutableState.GetChildrenInitializedPostResetPoint()
if len(childrenInitializedAfterResetPoint) > 0 {
// This parent was reset and it also has some children that potentially were restarted.
initiatedEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, parentInitiatedID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I don't think we need to load the initiatedEvent. Both child workflow type and child workflowID are available in ChildExecutionInfo in mutable state I think.

if err != nil {
return nil, consts.ErrChildExecutionNotFound
}
initiatedAttr := initiatedEvent.GetStartChildWorkflowExecutionInitiatedEventAttributes()
childID := fmt.Sprintf("%s:%s", initiatedAttr.GetWorkflowType().Name, initiatedAttr.GetWorkflowId())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we really should create a util function for this concatenation logic. I think I've seen it in 3 different places.

_, ok := childrenInitializedAfterResetPoint[childID]
if ok {
// The child sending this request was restarted. Do not accept any forwarded completion events because the restarted child will directly send its completion event to this parent.
// Sometimes the old child will race to complete before being restarted and it also happens to have the same initialized event ID. In such cases the result will come as forwarded request which we ignore here.
if isForwarded {
return nil, consts.ErrChildExecutionNotFound
}
// The results are from the child that this parent initiated. We should stop tracking the child and process the result now.
delete(childrenInitializedAfterResetPoint, childID)
mutableState.SetChildrenInitializedPostResetPoint(childrenInitializedAfterResetPoint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz update ms.approximateSize when setting the reset point.

}
}

completionEvent := request.CompletionEvent
switch completionEvent.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
Expand Down
165 changes: 165 additions & 0 deletions service/history/api/recordchildworkflowcompleted/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func Test_Recordchildworkflowcompleted_WithForwards(t *testing.T) {
newParentMutableState.EXPECT().GetChildExecutionInfo(anyArg).Return(childExecutionInfo, true)
newParentMutableState.EXPECT().HasPendingWorkflowTask().Return(false)
newParentMutableState.EXPECT().AddWorkflowTaskScheduledEvent(anyArg, anyArg).Return(nil, nil)
newParentMutableState.EXPECT().GetChildrenInitializedPostResetPoint().Return(nil)

mockWFContext := workflow.NewMockContext(ctrl)
mockWFContext.EXPECT().UpdateWorkflowExecutionAsActive(anyArg, anyArg).Return(nil)
Expand Down Expand Up @@ -170,3 +171,167 @@ func Test_Recordchildworkflowcompleted_WithInfiniteForwards(t *testing.T) {
require.ErrorIs(t, err, consts.ErrResetRedirectLimitReached)
require.Nil(t, resp)
}

// Tests that the parent successfully records a completion event from a child that was restarted due to parent reset.
// It mainly asserts that the ChildrenInitializedPostResetPoint in mtrent utable state is properly updated.
func Test_Recordchildworkflowcompleted_FromRestartedChild(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
anyArg := gomock.Any()

testNamespaceID := tests.NamespaceID
childWFType := uuid.NewString()
childWFID := uuid.NewString()
resetChildID := childWFType + ":" + childWFID
paretntWFID := uuid.NewString()
newParentRunID := uuid.NewString()
newParentWFKey := definition.NewWorkflowKey(testNamespaceID.String(), paretntWFID, newParentRunID)

// The request will be sent to the old parent.
request := &historyservice.RecordChildExecutionCompletedRequest{
NamespaceId: testNamespaceID.String(),
ParentExecution: &commonpb.WorkflowExecution{
RunId: newParentRunID,
WorkflowId: paretntWFID,
},
ChildExecution: &commonpb.WorkflowExecution{WorkflowId: childWFID},
CompletionEvent: &historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
},
}
mockRegistery := namespace.NewMockRegistry(ctrl)
mockRegistery.EXPECT().GetNamespaceByID(testNamespaceID).Return(&namespace.Namespace{}, nil)
mockClusterMetadata := cluster.NewMockMetadata(ctrl)
mockClusterMetadata.EXPECT().GetCurrentClusterName().Return("")
shardContext := shard.NewMockContext(ctrl)
shardContext.EXPECT().GetNamespaceRegistry().Return(mockRegistery)
shardContext.EXPECT().GetClusterMetadata().Return(mockClusterMetadata)

childrenInitializedPostResetPoint := map[string]*persistencespb.ResetChildInfo{
resetChildID: {},
"AnotherChild": {},
}
newParentMutableState := workflow.NewMockMutableState(ctrl)
newParentMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
newParentMutableState.EXPECT().GetNextEventID().Return(int64(10))
newParentMutableState.EXPECT().AddChildWorkflowExecutionCompletedEvent(anyArg, anyArg, anyArg).Return(nil, nil)
childExecutionInfo := &persistencespb.ChildExecutionInfo{
StartedEventId: int64(10), // indicate that the started event is already recorded.
StartedWorkflowId: childWFID,
}
newParentMutableState.EXPECT().GetChildExecutionInfo(anyArg).Return(childExecutionInfo, true)
newParentMutableState.EXPECT().HasPendingWorkflowTask().Return(false)
newParentMutableState.EXPECT().AddWorkflowTaskScheduledEvent(anyArg, anyArg).Return(nil, nil)
newParentMutableState.EXPECT().GetChildrenInitializedPostResetPoint().Return(childrenInitializedPostResetPoint)
newParentMutableState.EXPECT().SetChildrenInitializedPostResetPoint(childrenInitializedPostResetPoint).
Do(func(resetChildInfoMap map[string]*persistencespb.ResetChildInfo) {
require.NotContains(t, resetChildInfoMap, resetChildID) // assert that the entry for the child is removed.
require.Contains(t, resetChildInfoMap, "AnotherChild") // assert that the entry for the unrelated child is not affected.
})
newParentMutableState.EXPECT().GetChildExecutionInitiatedEvent(anyArg, anyArg).Return(&historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED,
Attributes: &historypb.HistoryEvent_StartChildWorkflowExecutionInitiatedEventAttributes{
StartChildWorkflowExecutionInitiatedEventAttributes: &historypb.StartChildWorkflowExecutionInitiatedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: childWFType},
WorkflowId: childWFID,
},
},
}, nil)

mockWFContext := workflow.NewMockContext(ctrl)
mockWFContext.EXPECT().UpdateWorkflowExecutionAsActive(anyArg, anyArg).Return(nil)

newParentWFLease := ndc.NewMockWorkflow(ctrl)
newParentWFLease.EXPECT().GetMutableState().Return(newParentMutableState).AnyTimes() // new parent's mutable state would be accessed many times.
newParentWFLease.EXPECT().GetReleaseFn().Return(func(_ error) {})
newParentWFLease.EXPECT().GetContext().Return(mockWFContext)

consistencyChecker := api.NewMockWorkflowConsistencyChecker(ctrl)
consistencyChecker.EXPECT().GetWorkflowLeaseWithConsistencyCheck(anyArg, anyArg, anyArg, newParentWFKey, anyArg).Return(newParentWFLease, nil)

resp, err := Invoke(ctx, request, shardContext, consistencyChecker)
require.NoError(t, err)
require.NotNil(t, resp)
}

// This test asserts that we don't accept results from an old child when the parent is reset and the child is restarted.
// This is to prevent the scenario where the child races to complete before the parent can successfully "restart" the child.
func Test_Recordchildworkflowcompleted_WithForwardsFromStaleChild(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
anyArg := gomock.Any()

testNamespaceID := tests.NamespaceID
childWFType := uuid.NewString()
childWFID := uuid.NewString()
resetChildID := childWFType + ":" + childWFID
paretntWFID := uuid.NewString()
oldParentRunID := uuid.NewString()
newParentRunID := uuid.NewString()
oldParentWFKey := definition.NewWorkflowKey(testNamespaceID.String(), paretntWFID, oldParentRunID)
newParentWFKey := definition.NewWorkflowKey(testNamespaceID.String(), paretntWFID, newParentRunID)
oldParentExecutionInfo := &persistencespb.WorkflowExecutionInfo{
ResetRunId: newParentRunID, // link the old parent to the new parent.
}

// The request will be sent to the old parent.
request := &historyservice.RecordChildExecutionCompletedRequest{
NamespaceId: testNamespaceID.String(),
ParentExecution: &commonpb.WorkflowExecution{
RunId: oldParentRunID,
WorkflowId: paretntWFID,
},
ChildExecution: &commonpb.WorkflowExecution{WorkflowId: childWFID},
CompletionEvent: &historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
},
}
mockRegistery := namespace.NewMockRegistry(ctrl)
mockRegistery.EXPECT().GetNamespaceByID(testNamespaceID).Return(&namespace.Namespace{}, nil)
mockClusterMetadata := cluster.NewMockMetadata(ctrl)
mockClusterMetadata.EXPECT().GetCurrentClusterName().Return("")
shardContext := shard.NewMockContext(ctrl)
shardContext.EXPECT().GetNamespaceRegistry().Return(mockRegistery)
shardContext.EXPECT().GetClusterMetadata().Return(mockClusterMetadata)

oldParentMutableState := workflow.NewMockMutableState(ctrl)
oldParentMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false)
oldParentMutableState.EXPECT().GetExecutionInfo().Return(oldParentExecutionInfo)

newParentMutableState := workflow.NewMockMutableState(ctrl)
newParentMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true)
newParentMutableState.EXPECT().GetNextEventID().Return(int64(10))
childExecutionInfo := &persistencespb.ChildExecutionInfo{
StartedEventId: int64(10), // indicate that the started event is already recorded.
StartedWorkflowId: childWFID,
}
newParentMutableState.EXPECT().GetChildExecutionInfo(anyArg).Return(childExecutionInfo, true)
newParentMutableState.EXPECT().GetChildrenInitializedPostResetPoint().Return(map[string]*persistencespb.ResetChildInfo{
resetChildID: {},
})
newParentMutableState.EXPECT().GetChildExecutionInitiatedEvent(anyArg, anyArg).Return(&historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED,
Attributes: &historypb.HistoryEvent_StartChildWorkflowExecutionInitiatedEventAttributes{
StartChildWorkflowExecutionInitiatedEventAttributes: &historypb.StartChildWorkflowExecutionInitiatedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: childWFType},
WorkflowId: childWFID,
},
},
}, nil)

oldParentWFLease := ndc.NewMockWorkflow(ctrl)
oldParentWFLease.EXPECT().GetMutableState().Return(oldParentMutableState) // old parent's mutable state is accessed just once.
oldParentWFLease.EXPECT().GetReleaseFn().Return(func(_ error) {})
newParentWFLease := ndc.NewMockWorkflow(ctrl)
newParentWFLease.EXPECT().GetMutableState().Return(newParentMutableState).AnyTimes() // new parent's mutable state would be accessed many times.
newParentWFLease.EXPECT().GetReleaseFn().Return(func(_ error) {})

consistencyChecker := api.NewMockWorkflowConsistencyChecker(ctrl)
consistencyChecker.EXPECT().GetWorkflowLeaseWithConsistencyCheck(anyArg, anyArg, anyArg, oldParentWFKey, anyArg).Return(oldParentWFLease, nil)
consistencyChecker.EXPECT().GetWorkflowLeaseWithConsistencyCheck(anyArg, anyArg, anyArg, newParentWFKey, anyArg).Return(newParentWFLease, nil)

resp, err := Invoke(ctx, request, shardContext, consistencyChecker)
require.ErrorIs(t, err, consts.ErrChildExecutionNotFound)
require.Nil(t, resp)
require.Equal(t, newParentRunID, request.ParentExecution.RunId) // the request should be modified to point to the new parent.
}
Loading