Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Scheduled Actions V2] Backfiller component #7336

Open
wants to merge 2 commits into
base: sched2_exe_p2
Choose a base branch
from

Conversation

lina-temporal
Copy link
Contributor

What changed?

Added the HSM Scheduler's Backfiller component, responsible for buffering manual actions.

Key differences between workflow scheduler and HSM backfiller logic:

  • A Backfiller component is 1:1 for each Backfill request, spawned on request and deleted on completion.
  • Backfillers each generate their own unique IDs at spawn, used as part of the generated request IDs for deduplication.
  • When the Invoker's buffer is full, the Backfiller will exponentially back off and retry filling, giving the Invoker a chance to catch up.

How did you test it?

Potential risks

  • Not in production yet
  • Missing subtle behavior from the previous backfill mechanism - although I believe the test cases are well represented.

@lina-temporal lina-temporal requested a review from a team as a code owner February 13, 2025 03:13
Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks great. Didn't have a lot of comments here.

// The Backfiller sub state machine is responsible for buffering manually
// requested actions. Each backfill request has its own Backfiller node.
Backfiller struct {
*schedulespb.BackfillerInternal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you don't need to suffix with with Internal IMHO, all of the APIs defined in server/api are considered internal.

Suggested change
*schedulespb.BackfillerInternal
*schedulespb.BackfillerInternal

func (b Backfiller) SetState(_ BackfillerMachineState) {}

func (b Backfiller) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) {
return nil, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the implementation is missing.

})
}

func (backfillerTaskExecutor) enqueue(schedulerNode *hsm.Node, starts []*schedulespb.BufferedStart) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like something that can be shared with the root scheduler's implementation.

Comment on lines +208 to +209
now := env.Now()
nowpb := timestamppb.New(now)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you're going to want to be more deterministic than this and take the time from the trigger request.
It may come in handy when resolving conflicts when two clusters become active.

Comment on lines +241 to +242
err = backfillerNode.Parent.Walk(func(node *hsm.Node) error {
if node.Key.Type == BackfillerMachineType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the hsm.Collection abstraction here:

func NewCollection[T any](node *Node, stateMachineType string) Collection[T] {

return
}

backfillerCount = max(1, backfillerCount)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to add a comment that this is to prevent division by zero?

}

func (BackfillTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error {
// Backfiller only has a single task/state, so no validation is done here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need a way to validate the task (probably by checking if there's an expected action for the given deadline). Otherwise, a standby cluster won't be able to discard these tasks when they've been executed in the active cluster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reach out to @yycptt for guidance in case I can't do a followup for this review.

Comment on lines +65 to +75
func (b Backfiller) tasks() ([]hsm.Task, error) {
return []hsm.Task{BackfillTask{deadline: b.NextInvocationTime.AsTime()}}, nil
}

func (b Backfiller) output() (hsm.TransitionOutput, error) {
tasks, err := b.tasks()
if err != nil {
return hsm.TransitionOutput{}, err
}
return hsm.TransitionOutput{Tasks: tasks}, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking for this in backfiller.go, got used to seeing task generation next to the state machine transition definitions.

@@ -12,6 +12,7 @@ breaking:
# Uncomment this to temporarily ignore specific files or directories:
ignore:
- temporal/server/api/historyservice/v1/
- temporal/server/api/schedule/v1/message.proto
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a comment to remove this exception once this PR is merged?
Also we should remove the exception for historyservice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants