-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Scheduled Actions V2] Invoker logic #7152
base: main
Are you sure you want to change the base?
Conversation
FrontendClient workflowservice.WorkflowServiceClient | ||
} | ||
|
||
executorTaskExecutor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about how everyone else feels, but this name makes my eyes bleed. At the same time, I didn't want to be inconsistent with other HSM naming. Maybe the path of least resistance is to just use a few synonyms for this? Like, starterTaskExecutor
(and rename the state machine to Starter
.. or something like that). Really open to suggestions. Ditto for renaming the tasks. In general I'm not satisfied with the naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not just a starter though, right? Will this also e.g. terminate workflows? You can call it an invoker maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, it also terminates/cancels as needed. Invoker
works for me!
} | ||
|
||
// TODO - set last completion result/continued failure | ||
// TODO - set search attributes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's anything blocking me from passing the search attributes through to StartWorkflowExecution
, I just hadn't closed off the TODO. Will follow up.
You can keep a rate limiter in the executor. If the rate limiter is out of tokens, you'll have to back off either by failing the task, or committing any pending progress and adding a persistent timer task. It's dependent on how long you need to wait for the next token. Do not wait on the rate limiter in the shared executor. |
components/scheduler/executor.go
Outdated
// We want Executor to immediately wake and attempt to buffer when new starts | ||
// are added. | ||
e.NextInvocationTime = nil | ||
e.NextInvocationTime = timestamppb.New(event.Deadline) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this always be the time of the first buffered start (chronologically)? I'm a bit rusty on the details so please correct me if I'm wrong. I'm just worried that this will invalidate a pending task if the event's buffered starts are further in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this always be the time of the first buffered start (chronologically)?
Yes, it should; that's what Deadline
gets set to. I suppose we could get rid of Deadline
and just compute it within the transition, though, I'll make that change.
I'm just worried that this will invalidate a pending task if the event's buffered starts are further in the future.
Is only one pending timer task allowed? If so, yes, that could happen presently.
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. | ||
// | ||
// Copyright (c) 2020 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. | |
// | |
// Copyright (c) 2020 Uber Technologies, Inc. | |
// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll follow up in a separate PR with this since it's from make generate
.
FrontendClient workflowservice.WorkflowServiceClient | ||
} | ||
|
||
executorTaskExecutor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not just a starter though, right? Will this also e.g. terminate workflows? You can call it an invoker maybe?
|
||
rateLimitedDetails struct { | ||
// The requested interval to delay processing by rescheduilng. | ||
Delay time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to export this field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needed to be exported in the workflow impl because it was serialized as an local activity result. I don't know what's required here.
"google.golang.org/protobuf/types/known/timestamppb" | ||
) | ||
|
||
type ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please stop grouping types, it's not a good practice. It generally messes with the generated docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please stop recommending against a perfectly good style that's already pervasive in the codebase without a team agreement that we want to deprecate it.
isRunning := len(scheduler.Info.RunningWorkflows) > 0 | ||
|
||
// Resolve overlap policies and prepare next workflows to start. | ||
action := scheduler1.ProcessBuffer(executor.BufferedStarts, isRunning, scheduler.resolveOverlapPolicy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually drain the entire buffer? I only see one action being returned here. (I don't have deep familiarity with the code so will just defer to you to confirm).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually drain the entire buffer? I only see one action being returned here.
It depends on the overlap policy; if something like OVERLAP_ALL
is specified, it'll drain the buffer. It drains as much of the buffer as possible; I guess I could rename the calling function from drainBuffer
to processBuffer
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree with that rename.. "drainBuffer" sounds like it'll be empty when it returns
} | ||
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout()) | ||
defer cancelFunc() | ||
_, err := e.HistoryClient.TerminateWorkflowExecution(ctx, request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why you start the workflow with the frontend client and terminate it with the history client. Why not be consistent here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what the existing workflow scheduler does; I believe that StartWorkflowExecution
goes through frontend to make sure metering is applied, which wouldn't matter for terminateWorkflow
/cancelWorkflow
. Though, I'm not sure the reason to have the other two not go through frontend - @dnr ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it goes through frontend for metering, and also namespace rps limit for safety.
Terminate/cancel could also, but I had already written them to go to history and it didn't seem worth changing at the time. I don't know if terminate/cancel are metered.. if they are, and if there's not a special case to exclude calls from schedules, that would be a visible metering change, so please check with someone who knows that stuff if you want to do that.
} | ||
|
||
return Executor{ | ||
ExecutorInternal: prevExecutor.ExecutorInternal, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, you're not cloning here, when you change the code to release the lock, you'll want to clone.
I probably wouldn't return the whole structs in these load
functions, instead take only what is relevant for the executor functionality.
errTypeRetryLimitExceeded = "RetryLimitExceeded" | ||
errTypeRateLimited = "RateLimited" | ||
errTypeAlreadyStarted = "serviceerror.WorkflowExecutionAlreadyStarted" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we'd use the standard errors package instead of performing string comparisons.
|
||
// translateError converts a dependent service error into an application error. | ||
// Errors are classified between retryable and non-retryable. | ||
func translateError(err error, msgPrefix string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand why you need this translation here. Why can't you use the original error? What's the value in wrapping with ApplicationError? That's an SDK concept and isn't relevant here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was to classify errors as retryable/non-retryable in a single point. Existing scheduler uses application errors to signal that from the local activities. If it's not a common pattern, I'll get rid of it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 temporal.ApplicationError is an sdk thing, don't use it (or any error wrapping) here
"google.golang.org/protobuf/types/known/timestamppb" | ||
) | ||
|
||
type ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please stop recommending against a perfectly good style that's already pervasive in the codebase without a team agreement that we want to deprecate it.
|
||
rateLimitedDetails struct { | ||
// The requested interval to delay processing by rescheduilng. | ||
Delay time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needed to be exported in the workflow impl because it was serialized as an local activity result. I don't know what's required here.
isRunning := len(scheduler.Info.RunningWorkflows) > 0 | ||
|
||
// Resolve overlap policies and prepare next workflows to start. | ||
action := scheduler1.ProcessBuffer(executor.BufferedStarts, isRunning, scheduler.resolveOverlapPolicy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree with that rename.. "drainBuffer" sounds like it'll be empty when it returns
} | ||
|
||
if env.Now().After(e.startWorkflowDeadline(scheduler, start)) { | ||
// Drop expired starts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a counter in ScheduleInfo for this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to add a TODO and add a follow-up PR so I don't have to block this PR on waiting for API approvals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this should fit into ScheduleInfo.MissedCatchupWindow
. I'll use that.
isRunning := len(scheduler.Info.RunningWorkflows) > 0 | ||
|
||
// Resolve overlap policies and prepare next workflows to start. | ||
action := scheduler1.ProcessBuffer(executor.BufferedStarts, isRunning, scheduler.resolveOverlapPolicy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good for now, when "scheduler1" is deprecated and removed, it might be worth taking a look to see if scheduler1.ProcessBuffer
can be refactored to be a more natural fit here
FirstExecutionRunId: target.RunId, | ||
}, | ||
} | ||
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should set callerinfo on this context (and all other client contexts) to get proper prioritization and stuff. Search for headers.SetCallerInfo
for examples. The workflow impl gets this from pernsworkermanager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will fix.
} | ||
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout()) | ||
defer cancelFunc() | ||
_, err := e.HistoryClient.TerminateWorkflowExecution(ctx, request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it goes through frontend for metering, and also namespace rps limit for safety.
Terminate/cancel could also, but I had already written them to go to history and it didn't seem worth changing at the time. I don't know if terminate/cancel are metered.. if they are, and if there's not a special case to exclude calls from schedules, that would be a visible metering change, so please check with someone who knows that stuff if you want to do that.
|
||
// translateError converts a dependent service error into an application error. | ||
// Errors are classified between retryable and non-retryable. | ||
func translateError(err error, msgPrefix string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 temporal.ApplicationError is an sdk thing, don't use it (or any error wrapping) here
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package scheduler_test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great.
Have you looked at the old workflow_test.go that much? Most of the detailed behaviors of the old impl are actually tested there; the functional tests are much more limited. Obviously it would be a bunch of work to port over, and a bunch is not applicable, but I think it would be pretty valuable to preserve most of those cases.
I'm thinking ones like TestUpdateBetweenNominalAndJitter
, TestUpdateNotRetroactive
, TestBackfillInclusiveStartEnd
, and maybe even TestHugeBackfillBuffer
though that would probably need more modification.
At least, maybe use it as inspiration for future tests :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good idea! I'll port over the tests you called out and give workflow_test.go another pass to see what else might make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave the existing tests in workflow_test
a more thorough look. At a high level, I think that most of these cases actually are covered in the new tests, just broken up closer to each component. Some of the Update specs that deal with behavior we'll implement synchronously (like clearing the buffer when a schedule's action changes), so they aren't represented yet. Others, such as TestLotsOfIterations
, the CAN tests, and the Signal tests, are obsolete given the state machine architecture. The ExitScheduleWorkflow
tests I'd consider analogously covered by the "empty buffer"/"no work to do" tests.
- I've brought a few more test cases over relevant to the Invoker
TestUpdateBetweenNominalAndJitter
: this tests this branch in existing scheduler, which maps to this branch in SpecProcessor. I've added the test case to SpecProcessor.TestUpdateNotRetroactive
: I think this test case will come later, with the update handler, since we'll be processing updates synchronously where possible.TestBackfillInclusiveStartEnd
: I'll make sure this is in the spec for the Backfiller.TestHugeBackfillBuffer
: ditto
// Buffer should be empty, scheduler metadata should be updated. | ||
schedulerSm, err := hsm.MachineData[scheduler.Scheduler](e.schedulerNode) | ||
require.NoError(e.T(), err) | ||
require.Equal(e.T(), 0, len(executor.BufferedStarts)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
require.Equal(e.T(), 0, len(executor.BufferedStarts)) | |
require.Empty(e.T(), executor.BufferedStarts) |
reads nicer
CanceledTerminatedCountAsFailures bool // Whether cancelled+terminated count for pause-on-failure | ||
RecentActionCount int // How many recent actions are recorded in SchedulerInfo. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This comment is unclear to me, could you please clarify?
`ExecutionTimeout is the timeout for executing a single scheduler task.`, | ||
ServiceCallTimeout = dynamicconfig.NewGlobalDurationSetting( | ||
"component.scheduler.serviceCallTimeout", | ||
5*time.Second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may cause problems at 10 seconds since the call takes up an executor in the history service. We may be able to put this on the outbound queue if it becomes an issue. In any case the fact that it's configurable is great. May want to ask @yycptt and @prathyushpv if they see an issue with this.
@@ -64,16 +67,22 @@ func (root) IsWorkflowExecutionRunning() bool { | |||
return true | |||
} | |||
|
|||
func (s fakeEnv) Access( | |||
func newFakeEnv() *fakeEnv { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say we move this to the hsmtest
(or maybe it already exists there) but it's really not critical since we will rewrite all of this soon.
// Add Executor sub state machine node | ||
executor := scheduler.NewExecutor() | ||
_, err = schedulerNode.AddChild(scheduler.ExecutorMachineKey, *executor) | ||
executor := scheduler.NewInvoker() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the comment and the variable name here and anywhere else.
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. | ||
// | ||
// Copyright (c) 2020 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2025 and remove Uber.
result.OverlapSkipped = action.OverlapSkipped | ||
|
||
// Add starting workflows to result, trim others. | ||
for _, start := range readyStarts { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that you'll likely want to check if there's anything to discard just before completing the task as more actions may get invalidated by then.
continue | ||
} | ||
|
||
if env.Now().After(e.startWorkflowDeadline(scheduler, start)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that once a request is put in the buffer you should execute it to completion it since you're risking creating orphans with task retries.
Not 100% sure what the implication of these orphaned workflows would be. At minimum it may affect counts that we maintain.
return nil, errRetryLimitExceeded | ||
} | ||
|
||
// Get rate limiter permission once per buffered start, on the first attempt only. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
|
||
func (e invokerTaskExecutor) newContext(namespace string) (context.Context, context.CancelFunc) { | ||
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout()) | ||
ctx = headers.SetCallerInfo(ctx, headers.NewBackgroundCallerInfo(namespace)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should already be set on the context passed to our task executors, worth checking. And see comment above about propagating that context.
|
||
// Don't count "already started" for the error metric or retry, as it is most likely | ||
// due to misconfiguration. | ||
if !isAlreadyStartedError(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be counted as a completed start as it could be as a result of a retry?
What changed?
The Invoker (start workflow) logic is added for HSM scheduler.
A few differences between workflow scheduler and HSM scheduler execution logic:
ProcessBuffer
task is 1:1 with a call toProcessBuffer
; draining is not attempted more than once on the same task.ProcessBuffer
task is scheduled to wake after the delay.ProcessBuffer
is used to drive retries.BufferedStart
has its ownAttempt
andBackoffTime
to manage retries. TheExecute
task is therefore idempotent.startWorkflow
,terminateWorkflow
, andcancelWorkflow
are no longer called within local activities. Instead, they are called with a short (5 sec) deadline within theExecute
task.Execute
task is generated whenever there are BufferedStarts, CancelWorkflows, or TerminateWorkflows available for execution. AnExecute
task will only ever make a single attempt on each work item before rescheduling aProcessBuffer
task.Execute
task completion will always schedule aProcessBuffer
task after completing work (unless the Execute task was a no-op). This is to avoid a race condition between reading the Invoker's current state, and choosing a transition (because applying a transition necessitates mutating the state field, which I'd rather just leave as-is).Still missing in HSM Invoker:
LastCompletionResult
also isn't wired upHow did you test it?
Potential risks