-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add lightweight subqueues for priority #7372
base: priority
Are you sure you want to change the base?
Conversation
@@ -40,7 +40,8 @@ type ( | |||
// genericTaskInfo contains the info for an activity or workflow task | |||
genericTaskInfo struct { | |||
*persistencespb.AllocatedTaskInfo | |||
completionFunc func(*internalTask, taskResponse) | |||
completionFunc func(*internalTask, taskResponse) | |||
backlogSubqueue int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to go away in a later PR, actually
tqCtx context.Context | ||
db *taskQueueDB | ||
config *taskQueueConfig | ||
subqueue int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to go away in a later PR (that merges taskGC into task reader)
common/persistence/sql/task.go
Outdated
idBytes = append(idBytes, namespaceID...) | ||
idBytes = append(idBytes, []byte(name)...) | ||
idBytes = append(idBytes, uint8(taskType)) | ||
if subqueue > 0 { | ||
// FIXME: this should be done so that it's invertible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll pull this fix into this PR since it's small
// We steal some upper bits of the "row type" field to hold a subqueue index. | ||
// Subqueue 0 must be the same as rowTypeTask (before subqueues were introduced). | ||
// 00000000: task in subqueue 0 | ||
// 00000001: task queue metadata | ||
// xxxxxx1x: reserved | ||
// 00000100: task in subqueue 1 | ||
// nnnnnn00: task in subqueue n, etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// We steal some upper bits of the "row type" field to hold a subqueue index. | |
// Subqueue 0 must be the same as rowTypeTask (before subqueues were introduced). | |
// 00000000: task in subqueue 0 | |
// 00000001: task queue metadata | |
// xxxxxx1x: reserved | |
// 00000100: task in subqueue 1 | |
// nnnnnn00: task in subqueue n, etc. | |
// We steal some upper bits of the "row type" field to hold a subqueue index. | |
// Subqueue 0 must be the same as rowTypeTask (before subqueues were introduced). | |
// 00000000: task in subqueue 0 (`rowTypeTask`) | |
// 00000001: task queue metadata (`rowTypeTaskQueue`) | |
// xxxxxx1x: reserved | |
// 00000100: task in subqueue 1 | |
// nnnnnn00: task in subqueue n, etc. |
) ([]byte, uint32) { | ||
id := m.taskQueueId(namespaceID, name, taskType) | ||
id := m.taskQueueId(namespaceID, name, taskType, subqueue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we rename name
to taskqueueName
and truncate the comment as the parameter types would be sufficient?
@@ -65,6 +65,35 @@ message TaskQueueInfo { | |||
google.protobuf.Timestamp expiry_time = 6; | |||
google.protobuf.Timestamp last_update_time = 7; | |||
int64 approximate_backlog_count = 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a note here and for ack_level
, too, that mentions it's a copy of the subqueue 0
's data? It's mentioned on subqueues, but worth mentioning twice, I think.
func (task *internalTask) getPriority() *commonpb.Priority { | ||
if task.event != nil { | ||
return task.event.AllocatedTaskInfo.GetData().GetPriority() | ||
} else if task.query != nil { | ||
return task.query.request.GetPriority() | ||
} | ||
return nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about dropping the else
?
func (task *internalTask) getPriority() *commonpb.Priority { | |
if task.event != nil { | |
return task.event.AllocatedTaskInfo.GetData().GetPriority() | |
} else if task.query != nil { | |
return task.query.request.GetPriority() | |
} | |
return nil | |
} | |
func (task *internalTask) getPriority() *commonpb.Priority { | |
if task.event != nil { | |
return task.event.AllocatedTaskInfo.GetData().GetPriority() | |
} | |
if task.query != nil { | |
return task.query.request.GetPriority() | |
} | |
return nil | |
} |
EDIT: After reading more code, I see the same pattern more often and am unlikely to convince you otherwise ;) But I'll leave my original one here.
@@ -263,6 +266,15 @@ func (task *internalTask) getResponse() (taskResponse, bool) { | |||
return <-task.responseC, true | |||
} | |||
|
|||
func (task *internalTask) getPriority() *commonpb.Priority { | |||
if task.event != nil { | |||
return task.event.AllocatedTaskInfo.GetData().GetPriority() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side note about existing code:
event
is quite a surprising/confusing naming choice.
|
||
c.loadSubqueuesLocked(subqueues) | ||
|
||
// this should be here now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a TODO comment?
hasDefault := false | ||
for _, s := range subqueues { | ||
if hasDefault = proto.Equal(s.Key, defKey); hasDefault { | ||
break | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could consider this now:
hasDefault := false | |
for _, s := range subqueues { | |
if hasDefault = proto.Equal(s.Key, defKey); hasDefault { | |
break | |
} | |
} | |
hasDefault := slices.ContainsFunc(subqueues, func(s *dbSubqueue) bool { | |
return proto.Equal(s.Key, defKey) | |
}) |
// start ack level + max read level just before the current block | ||
initAckLevel := rangeIDToTaskIDBlock(db.rangeID, db.config.RangeSize).start - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this a function on taskQueueDB
? It exists twice and the two lines are the same AFAICT but have/don't have comments and use different variable names.
return task.event.AllocatedTaskInfo.GetData().GetPriority() | ||
} else if task.query != nil { | ||
return task.query.request.GetPriority() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a comment mentioning Nexus?
return taskQueueState{rangeID: db.rangeID, ackLevel: db.ackLevel}, nil | ||
return taskQueueState{ | ||
rangeID: db.rangeID, | ||
ackLevel: db.subqueues[0].AckLevel, // TODO(pri): cleanup, only used by old backlog manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, it would be nice to have the db.subqueues[0]
meaning spelled out more explicitly for the "legacy" context.
Compare to #7371
What changed?
Why?
How did you test it?
new functional test, unit tests are broken, will be fixed in follow-up PR