Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heavy subqueues for priority #7371

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions api/persistence/v1/tasks.go-helpers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

218 changes: 150 additions & 68 deletions api/persistence/v1/tasks.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,11 @@ these log lines can be noisy, we want to be able to turn on and sample selective
false,
`Use priority-enabled TaskMatcher.`,
)
MatchingPriorityLevels = NewTaskQueueIntSetting(
"matching.priorityLevels",
5,
`Number of simple priority levels`,
)

// keys for history

Expand Down
13 changes: 13 additions & 0 deletions proto/internal/temporal/server/api/persistence/v1/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ message TaskQueueInfo {
google.protobuf.Timestamp expiry_time = 6;
google.protobuf.Timestamp last_update_time = 7;
int64 approximate_backlog_count = 8;
// Subqueues contains one entry for each subqueue in this physical task queue.
// Tasks are split into subqueues to implement priority and fairness.
// Subqueues are indexed starting from 0, the zero subqueue is always present.
// The message at index n describes the subqueue at index n.
repeated SubqueueKey subqueues = 9;
}

message SubqueueKey {
// Each subqueue contains tasks from only one priority level.
int32 priority = 1;

// // Additionally, tasks may be split by a fairness mechanism into buckets.
// int32 fairness_bucket = 2;
}

message TaskKey {
Expand Down
9 changes: 8 additions & 1 deletion service/matching/backlog_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (

backlogManagerImpl struct {
pqMgr physicalTaskQueueManager
subqueue int
tqCtx context.Context
db *taskQueueDB
taskWriter *taskWriter
Expand All @@ -87,6 +88,7 @@ var _ backlogManager = (*backlogManagerImpl)(nil)
func newBacklogManager(
tqCtx context.Context,
pqMgr physicalTaskQueueManager,
subqueue int,
config *taskQueueConfig,
taskManager persistence.TaskManager,
logger log.Logger,
Expand All @@ -96,6 +98,7 @@ func newBacklogManager(
) *backlogManagerImpl {
bmg := &backlogManagerImpl{
pqMgr: pqMgr,
subqueue: subqueue,
tqCtx: tqCtx,
matchingClient: matchingClient,
metricsHandler: metricsHandler,
Expand All @@ -104,7 +107,11 @@ func newBacklogManager(
config: config,
initializedError: future.NewFuture[struct{}](),
}
bmg.db = newTaskQueueDB(bmg, taskManager, pqMgr.QueueKey(), logger)
sqkey := SubqueueKey{
PhysicalTaskQueueKey: *pqMgr.QueueKey(),
subqueue: subqueue,
}
bmg.db = newTaskQueueDB(bmg, taskManager, sqkey, logger)
bmg.taskWriter = newTaskWriter(bmg)
if config.NewMatcher {
bmg.priTaskReader = newPriTaskReader(bmg)
Expand Down
13 changes: 10 additions & 3 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/tqid"
)
Expand Down Expand Up @@ -94,6 +93,7 @@ type (
QueryWorkflowTaskTimeoutLogRate dynamicconfig.FloatPropertyFnWithTaskQueueFilter
MembershipUnloadDelay dynamicconfig.DurationPropertyFn
TaskQueueInfoByBuildIdTTL dynamicconfig.DurationPropertyFnWithTaskQueueFilter
PriorityLevels dynamicconfig.IntPropertyFnWithTaskQueueFilter

// Time to hold a poll request before returning an empty response if there are no tasks
LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskQueueFilter
Expand Down Expand Up @@ -132,7 +132,6 @@ type (

taskQueueConfig struct {
forwarderConfig
CallerInfo headers.CallerInfo
SyncMatchWaitDuration func() time.Duration
BacklogNegligibleAge func() time.Duration
MaxWaitForPollerBeforeFwd func() time.Duration
Expand All @@ -146,6 +145,7 @@ type (
MaxTaskQueueIdleTime func() time.Duration
MinTaskThrottlingBurstSize func() int
MaxTaskDeleteBatchSize func() int
PriorityLevels func() int32

GetUserDataLongPollTimeout dynamicconfig.DurationPropertyFn
GetUserDataMinWaitTime time.Duration
Expand Down Expand Up @@ -267,6 +267,7 @@ func NewConfig(
QueryWorkflowTaskTimeoutLogRate: dynamicconfig.MatchingQueryWorkflowTaskTimeoutLogRate.Get(dc),
MembershipUnloadDelay: dynamicconfig.MatchingMembershipUnloadDelay.Get(dc),
TaskQueueInfoByBuildIdTTL: dynamicconfig.TaskQueueInfoByBuildIdTTL.Get(dc),
PriorityLevels: dynamicconfig.MatchingPriorityLevels.Get(dc),
MatchingDropNonRetryableTasks: dynamicconfig.MatchingDropNonRetryableTasks.Get(dc),
MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc),

Expand All @@ -293,7 +294,6 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
taskType := tq.TaskType()

return &taskQueueConfig{
CallerInfo: headers.NewBackgroundCallerInfo(ns.String()),
RangeSize: config.RangeSize,
NewMatcher: config.NewMatcher(ns.String(), taskQueueName, taskType),
GetTasksBatchSize: func() int {
Expand Down Expand Up @@ -324,6 +324,9 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
MaxTaskDeleteBatchSize: func() int {
return config.MaxTaskDeleteBatchSize(ns.String(), taskQueueName, taskType)
},
PriorityLevels: func() int32 {
return int32(config.PriorityLevels(ns.String(), taskQueueName, taskType))
},
GetUserDataLongPollTimeout: config.GetUserDataLongPollTimeout,
GetUserDataMinWaitTime: 1 * time.Second,
GetUserDataReturnBudget: returnEmptyTaskTimeBudget,
Expand Down Expand Up @@ -379,3 +382,7 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
},
}
}

func defaultPriorityLevel(priorityLevels int32) int32 {
return (priorityLevels + 1) / 2
}
70 changes: 65 additions & 5 deletions service/matching/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,21 @@ type (
taskQueueDB struct {
sync.Mutex
backlogMgr *backlogManagerImpl // accessing taskWriter and taskReader
queue *PhysicalTaskQueueKey
queue SubqueueKey
rangeID int64
ackLevel int64
store persistence.TaskManager
logger log.Logger
approximateBacklogCount atomic.Int64 // note that even though this is an atomic, it should only be written to while holding the db lock
maxReadLevel atomic.Int64 // note that even though this is an atomic, it should only be written to while holding the db lock
// The contents of this slice should be safe for concurrent read access. That means you
// can append to it or replace it with a new slice, but never mutate existing values.
subqueues []*persistencespb.SubqueueKey
}
taskQueueState struct {
rangeID int64
ackLevel int64
rangeID int64
ackLevel int64
subqueues []*persistencespb.SubqueueKey
}
)

Expand All @@ -78,7 +82,7 @@ type (
func newTaskQueueDB(
backlogMgr *backlogManagerImpl,
store persistence.TaskManager,
queue *PhysicalTaskQueueKey,
queue SubqueueKey,
logger log.Logger,
) *taskQueueDB {
return &taskQueueDB{
Expand Down Expand Up @@ -125,7 +129,11 @@ func (db *taskQueueDB) RenewLease(
return taskQueueState{}, err
}
}
return taskQueueState{rangeID: db.rangeID, ackLevel: db.ackLevel}, nil
return taskQueueState{
rangeID: db.rangeID,
ackLevel: db.ackLevel,
subqueues: db.subqueues,
}, nil
}

func (db *taskQueueDB) takeOverTaskQueueLocked(
Expand All @@ -151,9 +159,15 @@ func (db *taskQueueDB) takeOverTaskQueueLocked(
db.ackLevel = response.TaskQueueInfo.AckLevel
db.rangeID = response.RangeID + 1
db.approximateBacklogCount.Store(response.TaskQueueInfo.ApproximateBacklogCount)
if db.isSubqueueZero() {
db.subqueues = db.ensureDefaultSubqueuesLocked(response.TaskQueueInfo.Subqueues)
}
return nil

case *serviceerror.NotFound:
if db.isSubqueueZero() {
db.subqueues = db.ensureDefaultSubqueuesLocked(nil)
}
if _, err := db.store.CreateTaskQueue(ctx, &persistence.CreateTaskQueueRequest{
RangeID: initialRangeID,
TaskQueueInfo: db.cachedQueueInfo(),
Expand Down Expand Up @@ -323,6 +337,30 @@ func (db *taskQueueDB) CompleteTasksLessThan(
return n, err
}

func (db *taskQueueDB) AllocateSubqueue(ctx context.Context, priority int32) ([]*persistencespb.SubqueueKey, error) {
bugIf(!db.isSubqueueZero(), "bug: AllocateSubqueue called on non-zero subqueue")

db.Lock()
defer db.Unlock()

prevSubqueues := db.subqueues
db.subqueues = append(db.subqueues, &persistencespb.SubqueueKey{
Priority: priority,
})

// ensure written to metadata before returning
err := db.renewTaskQueueLocked(ctx, db.rangeID)
if err != nil {
// If this was a conflict, caller will shut down partition. Otherwise, we don't know
// for sure if this write made it to persistence or not. We should forget about the new
// subqueue and let a future call to AllocateSubqueue add it again. If we crash and
// reload, the new owner will see the subqueue present, which is fine.
db.subqueues = prevSubqueues
return nil, err
}
return db.subqueues, nil
}

func (db *taskQueueDB) expiryTime() *timestamppb.Timestamp {
switch db.queue.Partition().Kind() {
case enumspb.TASK_QUEUE_KIND_NORMAL:
Expand All @@ -344,6 +382,7 @@ func (db *taskQueueDB) cachedQueueInfo() *persistencespb.TaskQueueInfo {
ExpiryTime: db.expiryTime(),
LastUpdateTime: timestamp.TimeNowPtrUtc(),
ApproximateBacklogCount: db.approximateBacklogCount.Load(),
Subqueues: db.subqueues,
}
}

Expand All @@ -363,3 +402,24 @@ func (db *taskQueueDB) emitBacklogGauges() {
metrics.TaskLagPerTaskQueueGauge.With(db.backlogMgr.metricsHandler).Record(float64(maxReadLevel - db.ackLevel))
}
}

func (db *taskQueueDB) isSubqueueZero() bool {
return db.queue.subqueue == 0
}

func (db *taskQueueDB) ensureDefaultSubqueuesLocked(subqueues []*persistencespb.SubqueueKey) []*persistencespb.SubqueueKey {
defPriority := defaultPriorityLevel(db.backlogMgr.config.PriorityLevels())
hasDefault := false
for _, s := range db.subqueues {
hasDefault = s.Priority == defPriority
if hasDefault {
break
}
}
if !hasDefault {
subqueues = append(subqueues, &persistencespb.SubqueueKey{
Priority: defPriority,
})
}
return subqueues
}
20 changes: 18 additions & 2 deletions service/matching/matcher_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,27 @@ func (t *taskPQ) Len() int {
// implements heap.Interface, do not call directly
func (t *taskPQ) Less(i int, j int) bool {
a, b := t.heap[i], t.heap[j]

// poll forwarder is always last
if !a.isPollForwarder && b.isPollForwarder {
return true
} else if a.isPollForwarder && !b.isPollForwarder {
return false
}
// TODO(pri): use priority, task id, etc.
return false

// try priority
ap, bp := a.getPriority(), b.getPriority()
apk, bpk := ap.GetPriorityKey(), bp.GetPriorityKey()
if apk < bpk {
return true
} else if apk > bpk {
return false
}

// Note: sync match tasks have a fixed negative id.
// Query tasks will get 0 here.
aid, bid := a.event.GetTaskId(), b.event.GetTaskId()
return aid < bid
}

// implements heap.Interface, do not call directly
Expand Down
14 changes: 14 additions & 0 deletions service/matching/physical_task_queue_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ type (
// When present, it means this is a V3 pinned queue.
deploymentSeriesName string
}

SubqueueKey struct {
PhysicalTaskQueueKey
subqueue int
}
)

var (
Expand Down Expand Up @@ -210,3 +215,12 @@ func (v PhysicalTaskQueueVersion) MetricsTagValue() string {
}
return v.deploymentSeriesName + "/" + v.buildId
}

func (q *SubqueueKey) PersistenceName() string {
// FIXME: this may be ambiguous, fix before merging
name := q.PhysicalTaskQueueKey.PersistenceName()
if q.subqueue > 0 {
name += "%" + strconv.Itoa(q.subqueue)
}
return name
}
Loading
Loading