-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Scheduled Actions V2] Backfiller component #7336
base: sched2_exe_p2
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks great. Didn't have a lot of comments here.
// The Backfiller sub state machine is responsible for buffering manually | ||
// requested actions. Each backfill request has its own Backfiller node. | ||
Backfiller struct { | ||
*schedulespb.BackfillerInternal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you don't need to suffix with with Internal
IMHO, all of the APIs defined in server/api
are considered internal.
*schedulespb.BackfillerInternal | |
*schedulespb.BackfillerInternal |
func (b Backfiller) SetState(_ BackfillerMachineState) {} | ||
|
||
func (b Backfiller) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) { | ||
return nil, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the implementation is missing.
}) | ||
} | ||
|
||
func (backfillerTaskExecutor) enqueue(schedulerNode *hsm.Node, starts []*schedulespb.BufferedStart) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like something that can be shared with the root scheduler's implementation.
now := env.Now() | ||
nowpb := timestamppb.New(now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you're going to want to be more deterministic than this and take the time from the trigger request.
It may come in handy when resolving conflicts when two clusters become active.
err = backfillerNode.Parent.Walk(func(node *hsm.Node) error { | ||
if node.Key.Type == BackfillerMachineType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the hsm.Collection
abstraction here:
temporal/service/history/hsm/tree.go
Line 653 in 9c7d332
func NewCollection[T any](node *Node, stateMachineType string) Collection[T] { |
return | ||
} | ||
|
||
backfillerCount = max(1, backfillerCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to add a comment that this is to prevent division by zero?
} | ||
|
||
func (BackfillTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error { | ||
// Backfiller only has a single task/state, so no validation is done here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll need a way to validate the task (probably by checking if there's an expected action for the given deadline). Otherwise, a standby cluster won't be able to discard these tasks when they've been executed in the active cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can reach out to @yycptt for guidance in case I can't do a followup for this review.
func (b Backfiller) tasks() ([]hsm.Task, error) { | ||
return []hsm.Task{BackfillTask{deadline: b.NextInvocationTime.AsTime()}}, nil | ||
} | ||
|
||
func (b Backfiller) output() (hsm.TransitionOutput, error) { | ||
tasks, err := b.tasks() | ||
if err != nil { | ||
return hsm.TransitionOutput{}, err | ||
} | ||
return hsm.TransitionOutput{Tasks: tasks}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking for this in backfiller.go
, got used to seeing task generation next to the state machine transition definitions.
@@ -12,6 +12,7 @@ breaking: | |||
# Uncomment this to temporarily ignore specific files or directories: | |||
ignore: | |||
- temporal/server/api/historyservice/v1/ | |||
- temporal/server/api/schedule/v1/message.proto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put a comment to remove this exception once this PR is merged?
Also we should remove the exception for historyservice.
What changed?
Added the HSM Scheduler's Backfiller component, responsible for buffering manual actions.
Key differences between workflow scheduler and HSM backfiller logic:
How did you test it?
go test -v
Potential risks