Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lightweight subqueues for priority #7372

Open
wants to merge 2 commits into
base: priority
Choose a base branch
from

Conversation

dnr
Copy link
Member

@dnr dnr commented Feb 21, 2025

Compare to #7371

What changed?

  • Add subqueues for priority using a new backlog manager with less overhead than a full queue:
    • metadata (ack levels and approximate count) is shared in one database row, minimizing transactions
    • task writer can write to multiple subqueues at once
    • there's one task reader, ack manager, and task gc per subqueue
  • This forks backlog_manager.go and task_writer.go. You can check it out and look at the diff between the old and new files for reviewing.

Why?

  • Support simple priority and later fairness.

How did you test it?

new functional test, unit tests are broken, will be fixed in follow-up PR

@dnr dnr marked this pull request as ready for review February 28, 2025 19:06
@dnr dnr requested a review from a team as a code owner February 28, 2025 19:06
@@ -40,7 +40,8 @@ type (
// genericTaskInfo contains the info for an activity or workflow task
genericTaskInfo struct {
*persistencespb.AllocatedTaskInfo
completionFunc func(*internalTask, taskResponse)
completionFunc func(*internalTask, taskResponse)
backlogSubqueue int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to go away in a later PR, actually

tqCtx context.Context
db *taskQueueDB
config *taskQueueConfig
subqueue int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to go away in a later PR (that merges taskGC into task reader)

idBytes = append(idBytes, namespaceID...)
idBytes = append(idBytes, []byte(name)...)
idBytes = append(idBytes, uint8(taskType))
if subqueue > 0 {
// FIXME: this should be done so that it's invertible
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll pull this fix into this PR since it's small

Comment on lines +165 to +171
// We steal some upper bits of the "row type" field to hold a subqueue index.
// Subqueue 0 must be the same as rowTypeTask (before subqueues were introduced).
// 00000000: task in subqueue 0
// 00000001: task queue metadata
// xxxxxx1x: reserved
// 00000100: task in subqueue 1
// nnnnnn00: task in subqueue n, etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// We steal some upper bits of the "row type" field to hold a subqueue index.
// Subqueue 0 must be the same as rowTypeTask (before subqueues were introduced).
// 00000000: task in subqueue 0
// 00000001: task queue metadata
// xxxxxx1x: reserved
// 00000100: task in subqueue 1
// nnnnnn00: task in subqueue n, etc.
// We steal some upper bits of the "row type" field to hold a subqueue index.
// Subqueue 0 must be the same as rowTypeTask (before subqueues were introduced).
// 00000000: task in subqueue 0 (`rowTypeTask`)
// 00000001: task queue metadata (`rowTypeTaskQueue`)
// xxxxxx1x: reserved
// 00000100: task in subqueue 1
// nnnnnn00: task in subqueue n, etc.

) ([]byte, uint32) {
id := m.taskQueueId(namespaceID, name, taskType)
id := m.taskQueueId(namespaceID, name, taskType, subqueue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename name to taskqueueName and truncate the comment as the parameter types would be sufficient?

@@ -65,6 +65,35 @@ message TaskQueueInfo {
google.protobuf.Timestamp expiry_time = 6;
google.protobuf.Timestamp last_update_time = 7;
int64 approximate_backlog_count = 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a note here and for ack_level, too, that mentions it's a copy of the subqueue 0's data? It's mentioned on subqueues, but worth mentioning twice, I think.

Comment on lines +269 to +277
func (task *internalTask) getPriority() *commonpb.Priority {
if task.event != nil {
return task.event.AllocatedTaskInfo.GetData().GetPriority()
} else if task.query != nil {
return task.query.request.GetPriority()
}
return nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about dropping the else?

Suggested change
func (task *internalTask) getPriority() *commonpb.Priority {
if task.event != nil {
return task.event.AllocatedTaskInfo.GetData().GetPriority()
} else if task.query != nil {
return task.query.request.GetPriority()
}
return nil
}
func (task *internalTask) getPriority() *commonpb.Priority {
if task.event != nil {
return task.event.AllocatedTaskInfo.GetData().GetPriority()
}
if task.query != nil {
return task.query.request.GetPriority()
}
return nil
}

EDIT: After reading more code, I see the same pattern more often and am unlikely to convince you otherwise ;) But I'll leave my original one here.

@@ -263,6 +266,15 @@ func (task *internalTask) getResponse() (taskResponse, bool) {
return <-task.responseC, true
}

func (task *internalTask) getPriority() *commonpb.Priority {
if task.event != nil {
return task.event.AllocatedTaskInfo.GetData().GetPriority()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side note about existing code:
event is quite a surprising/confusing naming choice.


c.loadSubqueuesLocked(subqueues)

// this should be here now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a TODO comment?

Comment on lines +495 to +500
hasDefault := false
for _, s := range subqueues {
if hasDefault = proto.Equal(s.Key, defKey); hasDefault {
break
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider this now:

Suggested change
hasDefault := false
for _, s := range subqueues {
if hasDefault = proto.Equal(s.Key, defKey); hasDefault {
break
}
}
hasDefault := slices.ContainsFunc(subqueues, func(s *dbSubqueue) bool {
return proto.Equal(s.Key, defKey)
})

Comment on lines +514 to +515
// start ack level + max read level just before the current block
initAckLevel := rangeIDToTaskIDBlock(db.rangeID, db.config.RangeSize).start - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this a function on taskQueueDB? It exists twice and the two lines are the same AFAICT but have/don't have comments and use different variable names.

return task.event.AllocatedTaskInfo.GetData().GetPriority()
} else if task.query != nil {
return task.query.request.GetPriority()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a comment mentioning Nexus?

return taskQueueState{rangeID: db.rangeID, ackLevel: db.ackLevel}, nil
return taskQueueState{
rangeID: db.rangeID,
ackLevel: db.subqueues[0].AckLevel, // TODO(pri): cleanup, only used by old backlog manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, it would be nice to have the db.subqueues[0] meaning spelled out more explicitly for the "legacy" context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants