Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Scheduled Actions V2] Invoker logic #7152

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Conversation

lina-temporal
Copy link
Contributor

@lina-temporal lina-temporal commented Jan 23, 2025

What changed?

The Invoker (start workflow) logic is added for HSM scheduler.

A few differences between workflow scheduler and HSM scheduler execution logic:

  • A ProcessBuffer task is 1:1 with a call to ProcessBuffer; draining is not attempted more than once on the same task.
  • Any time processing indicates that a delay must be taken (rate limiter, exponential backoff..), another ProcessBuffer task is scheduled to wake after the delay. ProcessBuffer is used to drive retries.
    • In the event that some external event must occur for execution to continue (such as a running workflow closing), the Executor is transitioned to waiting/idle, and no further action will be taken until another state machine wakes it back up.
  • Each BufferedStart has its own Attempt and BackoffTime to manage retries. The Execute task is therefore idempotent.
    • HSM Scheduler can therefore continue to accept API calls while a BufferedStart is retrying, and each individual BufferedStart can make retry progress without having to hold a workflow lock.
  • startWorkflow, terminateWorkflow, and cancelWorkflow are no longer called within local activities. Instead, they are called with a short (5 sec) deadline within the Execute task.
  • A single Execute task is generated whenever there are BufferedStarts, CancelWorkflows, or TerminateWorkflows available for execution. An Execute task will only ever make a single attempt on each work item before rescheduling a ProcessBuffer task.
    • Execute task completion will always schedule a ProcessBuffer task after completing work (unless the Execute task was a no-op). This is to avoid a race condition between reading the Invoker's current state, and choosing a transition (because applying a transition necessitates mutating the state field, which I'd rather just leave as-is).

Still missing in HSM Invoker:

  • Workflow Watcher support. This is something we don't plan to touch until the CHASM port over.
    • This implies LastCompletionResult also isn't wired up
  • Rate Limiter support

How did you test it?

  • New tests
  • go test -v && make lint

Potential risks

  • HSM scheduler isn't presently deployed

@lina-temporal lina-temporal requested review from bergundy and dnr January 23, 2025 20:48
@lina-temporal lina-temporal requested a review from a team as a code owner January 23, 2025 20:48
FrontendClient workflowservice.WorkflowServiceClient
}

executorTaskExecutor struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about how everyone else feels, but this name makes my eyes bleed. At the same time, I didn't want to be inconsistent with other HSM naming. Maybe the path of least resistance is to just use a few synonyms for this? Like, starterTaskExecutor (and rename the state machine to Starter.. or something like that). Really open to suggestions. Ditto for renaming the tasks. In general I'm not satisfied with the naming.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not just a starter though, right? Will this also e.g. terminate workflows? You can call it an invoker maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, it also terminates/cancels as needed. Invoker works for me!

}

// TODO - set last completion result/continued failure
// TODO - set search attributes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's anything blocking me from passing the search attributes through to StartWorkflowExecution, I just hadn't closed off the TODO. Will follow up.

@bergundy
Copy link
Member

Rate Limiter support. The existing rate limiter acts per-namespace (given it is part of per-NS worker), and I'm not sure of the right way to wire that into an HSM executor (which is scoped wider than a single namespace).

You can keep a rate limiter in the executor. If the rate limiter is out of tokens, you'll have to back off either by failing the task, or committing any pending progress and adding a persistent timer task. It's dependent on how long you need to wait for the next token. Do not wait on the rate limiter in the shared executor.

// We want Executor to immediately wake and attempt to buffer when new starts
// are added.
e.NextInvocationTime = nil
e.NextInvocationTime = timestamppb.New(event.Deadline)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this always be the time of the first buffered start (chronologically)? I'm a bit rusty on the details so please correct me if I'm wrong. I'm just worried that this will invalidate a pending task if the event's buffered starts are further in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this always be the time of the first buffered start (chronologically)?

Yes, it should; that's what Deadline gets set to. I suppose we could get rid of Deadline and just compute it within the transition, though, I'll make that change.

I'm just worried that this will invalidate a pending task if the event's buffered starts are further in the future.

Is only one pending timer task allowed? If so, yes, that could happen presently.

Comment on lines 3 to 5
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll follow up in a separate PR with this since it's from make generate.

FrontendClient workflowservice.WorkflowServiceClient
}

executorTaskExecutor struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not just a starter though, right? Will this also e.g. terminate workflows? You can call it an invoker maybe?


rateLimitedDetails struct {
// The requested interval to delay processing by rescheduilng.
Delay time.Duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to export this field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, will fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needed to be exported in the workflow impl because it was serialized as an local activity result. I don't know what's required here.

"google.golang.org/protobuf/types/known/timestamppb"
)

type (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please stop grouping types, it's not a good practice. It generally messes with the generated docs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please stop recommending against a perfectly good style that's already pervasive in the codebase without a team agreement that we want to deprecate it.

isRunning := len(scheduler.Info.RunningWorkflows) > 0

// Resolve overlap policies and prepare next workflows to start.
action := scheduler1.ProcessBuffer(executor.BufferedStarts, isRunning, scheduler.resolveOverlapPolicy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually drain the entire buffer? I only see one action being returned here. (I don't have deep familiarity with the code so will just defer to you to confirm).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually drain the entire buffer? I only see one action being returned here.

It depends on the overlap policy; if something like OVERLAP_ALL is specified, it'll drain the buffer. It drains as much of the buffer as possible; I guess I could rename the calling function from drainBuffer to processBuffer as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd agree with that rename.. "drainBuffer" sounds like it'll be empty when it returns

}
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout())
defer cancelFunc()
_, err := e.HistoryClient.TerminateWorkflowExecution(ctx, request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why you start the workflow with the frontend client and terminate it with the history client. Why not be consistent here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what the existing workflow scheduler does; I believe that StartWorkflowExecution goes through frontend to make sure metering is applied, which wouldn't matter for terminateWorkflow/cancelWorkflow. Though, I'm not sure the reason to have the other two not go through frontend - @dnr ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it goes through frontend for metering, and also namespace rps limit for safety.

Terminate/cancel could also, but I had already written them to go to history and it didn't seem worth changing at the time. I don't know if terminate/cancel are metered.. if they are, and if there's not a special case to exclude calls from schedules, that would be a visible metering change, so please check with someone who knows that stuff if you want to do that.

}

return Executor{
ExecutorInternal: prevExecutor.ExecutorInternal,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, you're not cloning here, when you change the code to release the lock, you'll want to clone.
I probably wouldn't return the whole structs in these load functions, instead take only what is relevant for the executor functionality.

Comment on lines 85 to 87
errTypeRetryLimitExceeded = "RetryLimitExceeded"
errTypeRateLimited = "RateLimited"
errTypeAlreadyStarted = "serviceerror.WorkflowExecutionAlreadyStarted"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we'd use the standard errors package instead of performing string comparisons.


// translateError converts a dependent service error into an application error.
// Errors are classified between retryable and non-retryable.
func translateError(err error, msgPrefix string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand why you need this translation here. Why can't you use the original error? What's the value in wrapping with ApplicationError? That's an SDK concept and isn't relevant here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to classify errors as retryable/non-retryable in a single point. Existing scheduler uses application errors to signal that from the local activities. If it's not a common pattern, I'll get rid of it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 temporal.ApplicationError is an sdk thing, don't use it (or any error wrapping) here

Base automatically changed from sched2_exe_p1 to main January 27, 2025 17:14
"google.golang.org/protobuf/types/known/timestamppb"
)

type (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please stop recommending against a perfectly good style that's already pervasive in the codebase without a team agreement that we want to deprecate it.


rateLimitedDetails struct {
// The requested interval to delay processing by rescheduilng.
Delay time.Duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needed to be exported in the workflow impl because it was serialized as an local activity result. I don't know what's required here.

isRunning := len(scheduler.Info.RunningWorkflows) > 0

// Resolve overlap policies and prepare next workflows to start.
action := scheduler1.ProcessBuffer(executor.BufferedStarts, isRunning, scheduler.resolveOverlapPolicy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd agree with that rename.. "drainBuffer" sounds like it'll be empty when it returns

}

if env.Now().After(e.startWorkflowDeadline(scheduler, start)) {
// Drop expired starts.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a counter in ScheduleInfo for this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to add a TODO and add a follow-up PR so I don't have to block this PR on waiting for API approvals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this should fit into ScheduleInfo.MissedCatchupWindow. I'll use that.

isRunning := len(scheduler.Info.RunningWorkflows) > 0

// Resolve overlap policies and prepare next workflows to start.
action := scheduler1.ProcessBuffer(executor.BufferedStarts, isRunning, scheduler.resolveOverlapPolicy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good for now, when "scheduler1" is deprecated and removed, it might be worth taking a look to see if scheduler1.ProcessBuffer can be refactored to be a more natural fit here

FirstExecutionRunId: target.RunId,
},
}
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should set callerinfo on this context (and all other client contexts) to get proper prioritization and stuff. Search for headers.SetCallerInfo for examples. The workflow impl gets this from pernsworkermanager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will fix.

}
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout())
defer cancelFunc()
_, err := e.HistoryClient.TerminateWorkflowExecution(ctx, request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it goes through frontend for metering, and also namespace rps limit for safety.

Terminate/cancel could also, but I had already written them to go to history and it didn't seem worth changing at the time. I don't know if terminate/cancel are metered.. if they are, and if there's not a special case to exclude calls from schedules, that would be a visible metering change, so please check with someone who knows that stuff if you want to do that.


// translateError converts a dependent service error into an application error.
// Errors are classified between retryable and non-retryable.
func translateError(err error, msgPrefix string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 temporal.ApplicationError is an sdk thing, don't use it (or any error wrapping) here

// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package scheduler_test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great.

Have you looked at the old workflow_test.go that much? Most of the detailed behaviors of the old impl are actually tested there; the functional tests are much more limited. Obviously it would be a bunch of work to port over, and a bunch is not applicable, but I think it would be pretty valuable to preserve most of those cases.

I'm thinking ones like TestUpdateBetweenNominalAndJitter, TestUpdateNotRetroactive, TestBackfillInclusiveStartEnd, and maybe even TestHugeBackfillBuffer though that would probably need more modification.

At least, maybe use it as inspiration for future tests :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea! I'll port over the tests you called out and give workflow_test.go another pass to see what else might make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave the existing tests in workflow_test a more thorough look. At a high level, I think that most of these cases actually are covered in the new tests, just broken up closer to each component. Some of the Update specs that deal with behavior we'll implement synchronously (like clearing the buffer when a schedule's action changes), so they aren't represented yet. Others, such as TestLotsOfIterations, the CAN tests, and the Signal tests, are obsolete given the state machine architecture. The ExitScheduleWorkflow tests I'd consider analogously covered by the "empty buffer"/"no work to do" tests.

  • I've brought a few more test cases over relevant to the Invoker
  • TestUpdateBetweenNominalAndJitter: this tests this branch in existing scheduler, which maps to this branch in SpecProcessor. I've added the test case to SpecProcessor.
  • TestUpdateNotRetroactive: I think this test case will come later, with the update handler, since we'll be processing updates synchronously where possible.
  • TestBackfillInclusiveStartEnd: I'll make sure this is in the spec for the Backfiller.
  • TestHugeBackfillBuffer: ditto

// Buffer should be empty, scheduler metadata should be updated.
schedulerSm, err := hsm.MachineData[scheduler.Scheduler](e.schedulerNode)
require.NoError(e.T(), err)
require.Equal(e.T(), 0, len(executor.BufferedStarts))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
require.Equal(e.T(), 0, len(executor.BufferedStarts))
require.Empty(e.T(), executor.BufferedStarts)

reads nicer

@lina-temporal lina-temporal changed the title [Scheduled Actions V2] Executor logic [Scheduled Actions V2] Invoker logic Feb 11, 2025
CanceledTerminatedCountAsFailures bool // Whether cancelled+terminated count for pause-on-failure
RecentActionCount int // How many recent actions are recorded in SchedulerInfo.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This comment is unclear to me, could you please clarify?

`ExecutionTimeout is the timeout for executing a single scheduler task.`,
ServiceCallTimeout = dynamicconfig.NewGlobalDurationSetting(
"component.scheduler.serviceCallTimeout",
5*time.Second,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may cause problems at 10 seconds since the call takes up an executor in the history service. We may be able to put this on the outbound queue if it becomes an issue. In any case the fact that it's configurable is great. May want to ask @yycptt and @prathyushpv if they see an issue with this.

@@ -64,16 +67,22 @@ func (root) IsWorkflowExecutionRunning() bool {
return true
}

func (s fakeEnv) Access(
func newFakeEnv() *fakeEnv {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say we move this to the hsmtest (or maybe it already exists there) but it's really not critical since we will rewrite all of this soon.

Comment on lines 130 to +131
// Add Executor sub state machine node
executor := scheduler.NewExecutor()
_, err = schedulerNode.AddChild(scheduler.ExecutorMachineKey, *executor)
executor := scheduler.NewInvoker()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the comment and the variable name here and anywhere else.

Comment on lines +3 to +5
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2025 and remove Uber.

result.OverlapSkipped = action.OverlapSkipped

// Add starting workflows to result, trim others.
for _, start := range readyStarts {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that you'll likely want to check if there's anything to discard just before completing the task as more actions may get invalidated by then.

continue
}

if env.Now().After(e.startWorkflowDeadline(scheduler, start)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that once a request is put in the buffer you should execute it to completion it since you're risking creating orphans with task retries.

Not 100% sure what the implication of these orphaned workflows would be. At minimum it may affect counts that we maintain.

return nil, errRetryLimitExceeded
}

// Get rate limiter permission once per buffered start, on the first attempt only.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?


func (e invokerTaskExecutor) newContext(namespace string) (context.Context, context.CancelFunc) {
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout())
ctx = headers.SetCallerInfo(ctx, headers.NewBackgroundCallerInfo(namespace))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should already be set on the context passed to our task executors, worth checking. And see comment above about propagating that context.


// Don't count "already started" for the error metric or retry, as it is most likely
// due to misconfiguration.
if !isAlreadyStartedError(err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be counted as a completed start as it could be as a result of a retry?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants