-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Reset] Handle completion events from restarted children #7295
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ package recordchildworkflowcompleted | |
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
enumspb "go.temporal.io/api/enums/v1" | ||
"go.temporal.io/server/api/historyservice/v1" | ||
|
@@ -62,18 +63,20 @@ func Invoke( | |
|
||
// If the parent is reset, we need to follow a possible chain of resets to deliver the completion event to the correct parent. | ||
redirectCount := 0 | ||
isForwarded := false | ||
for { | ||
resetRunID, err := recordChildWorkflowCompleted(ctx, request, shardContext, workflowConsistencyChecker) | ||
resetRunID, err := recordChildWorkflowCompleted(ctx, request, shardContext, workflowConsistencyChecker, isForwarded) | ||
if errors.Is(err, consts.ErrWorkflowCompleted) { | ||
// if the parent was reset, forward the request to the new run pointed by resetRunID | ||
// Note: An alternative solution is to load the current run here ane compare the originalRunIDs of the current run and the closed parent. | ||
// Note: An alternative solution is to load the current run here and compare the originalRunIDs of the current run and the closed parent. | ||
// If they match, then deliver it to the current run. We should consider this optimization if we notice that reset chain is longer than 1-2 hops. | ||
if resetRunID != "" { | ||
if redirectCount >= maxResetRedirectCount { | ||
return nil, consts.ErrResetRedirectLimitReached | ||
} | ||
redirectCount++ | ||
request.ParentExecution.RunId = resetRunID | ||
isForwarded = true | ||
continue | ||
} | ||
} | ||
|
@@ -91,6 +94,7 @@ func recordChildWorkflowCompleted( | |
request *historyservice.RecordChildExecutionCompletedRequest, | ||
shardContext shard.Context, | ||
workflowConsistencyChecker api.WorkflowConsistencyChecker, | ||
isForwarded bool, | ||
) (string, error) { | ||
resetRunID := "" | ||
parentInitiatedID := request.ParentInitiatedId | ||
|
@@ -153,6 +157,28 @@ func recordChildWorkflowCompleted( | |
return nil, consts.ErrChildExecutionNotFound | ||
} | ||
|
||
childrenInitializedAfterResetPoint := mutableState.GetChildrenInitializedPostResetPoint() | ||
if len(childrenInitializedAfterResetPoint) > 0 { | ||
// This parent was reset and it also has some children that potentially were restarted. | ||
initiatedEvent, err := mutableState.GetChildExecutionInitiatedEvent(ctx, parentInitiatedID) | ||
if err != nil { | ||
return nil, consts.ErrChildExecutionNotFound | ||
} | ||
initiatedAttr := initiatedEvent.GetStartChildWorkflowExecutionInitiatedEventAttributes() | ||
childID := fmt.Sprintf("%s:%s", initiatedAttr.GetWorkflowType().Name, initiatedAttr.GetWorkflowId()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we really should create a util function for this concatenation logic. I think I've seen it in 3 different places. |
||
_, ok := childrenInitializedAfterResetPoint[childID] | ||
if ok { | ||
// The child sending this request was restarted. Do not accept any forwarded completion events because the restarted child will directly send its completion event to this parent. | ||
// Sometimes the old child will race to complete before being restarted and it also happens to have the same initialized event ID. In such cases the result will come as forwarded request which we ignore here. | ||
if isForwarded { | ||
return nil, consts.ErrChildExecutionNotFound | ||
} | ||
// The results are from the child that this parent initiated. We should stop tracking the child and process the result now. | ||
delete(childrenInitializedAfterResetPoint, childID) | ||
mutableState.SetChildrenInitializedPostResetPoint(childrenInitializedAfterResetPoint) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz update |
||
} | ||
} | ||
|
||
completionEvent := request.CompletionEvent | ||
switch completionEvent.GetEventType() { | ||
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I don't think we need to load the initiatedEvent. Both child workflow type and child workflowID are available in ChildExecutionInfo in mutable state I think.