From 9c1b52ff11a427163758561c24e5d869c401af62 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Thu, 20 Feb 2025 17:26:50 -0800 Subject: [PATCH] Add heavy subqueues for priority --- api/persistence/v1/tasks.go-helpers.pb.go | 37 +++ api/persistence/v1/tasks.pb.go | 218 ++++++++++++------ common/dynamicconfig/constants.go | 5 + .../server/api/persistence/v1/tasks.proto | 13 ++ service/matching/backlog_manager.go | 9 +- service/matching/config.go | 13 +- service/matching/db.go | 70 +++++- service/matching/matcher_data.go | 20 +- service/matching/physical_task_queue_key.go | 14 ++ .../matching/physical_task_queue_manager.go | 166 +++++++++++-- .../physical_task_queue_manager_interface.go | 1 + service/matching/pri_task_reader.go | 6 +- service/matching/task.go | 9 + service/matching/task_writer.go | 6 + tests/priority_fairness_test.go | 157 +++++++++++++ 15 files changed, 640 insertions(+), 104 deletions(-) create mode 100644 tests/priority_fairness_test.go diff --git a/api/persistence/v1/tasks.go-helpers.pb.go b/api/persistence/v1/tasks.go-helpers.pb.go index 42854850ea2..0abd884807a 100644 --- a/api/persistence/v1/tasks.go-helpers.pb.go +++ b/api/persistence/v1/tasks.go-helpers.pb.go @@ -140,6 +140,43 @@ func (this *TaskQueueInfo) Equal(that interface{}) bool { return proto.Equal(this, that1) } +// Marshal an object of type SubqueueKey to the protobuf v3 wire format +func (val *SubqueueKey) Marshal() ([]byte, error) { + return proto.Marshal(val) +} + +// Unmarshal an object of type SubqueueKey from the protobuf v3 wire format +func (val *SubqueueKey) Unmarshal(buf []byte) error { + return proto.Unmarshal(buf, val) +} + +// Size returns the size of the object, in bytes, once serialized +func (val *SubqueueKey) Size() int { + return proto.Size(val) +} + +// Equal returns whether two SubqueueKey values are equivalent by recursively +// comparing the message's fields. +// For more information see the documentation for +// https://pkg.go.dev/google.golang.org/protobuf/proto#Equal +func (this *SubqueueKey) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + var that1 *SubqueueKey + switch t := that.(type) { + case *SubqueueKey: + that1 = t + case SubqueueKey: + that1 = &t + default: + return false + } + + return proto.Equal(this, that1) +} + // Marshal an object of type TaskKey to the protobuf v3 wire format func (val *TaskKey) Marshal() ([]byte, error) { return proto.Marshal(val) diff --git a/api/persistence/v1/tasks.pb.go b/api/persistence/v1/tasks.pb.go index 9e53c58a10d..44bcb97d440 100644 --- a/api/persistence/v1/tasks.pb.go +++ b/api/persistence/v1/tasks.pb.go @@ -240,6 +240,11 @@ type TaskQueueInfo struct { ExpiryTime *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=expiry_time,json=expiryTime,proto3" json:"expiry_time,omitempty"` LastUpdateTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=last_update_time,json=lastUpdateTime,proto3" json:"last_update_time,omitempty"` ApproximateBacklogCount int64 `protobuf:"varint,8,opt,name=approximate_backlog_count,json=approximateBacklogCount,proto3" json:"approximate_backlog_count,omitempty"` + // Subqueues contains one entry for each subqueue in this physical task queue. + // Tasks are split into subqueues to implement priority and fairness. + // Subqueues are indexed starting from 0, the zero subqueue is always present. + // The message at index n describes the subqueue at index n. + Subqueues []*SubqueueKey `protobuf:"bytes,9,rep,name=subqueues,proto3" json:"subqueues,omitempty"` } func (x *TaskQueueInfo) Reset() { @@ -330,6 +335,61 @@ func (x *TaskQueueInfo) GetApproximateBacklogCount() int64 { return 0 } +func (x *TaskQueueInfo) GetSubqueues() []*SubqueueKey { + if x != nil { + return x.Subqueues + } + return nil +} + +type SubqueueKey struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Each subqueue contains tasks from only one priority level. + Priority int32 `protobuf:"varint,1,opt,name=priority,proto3" json:"priority,omitempty"` +} + +func (x *SubqueueKey) Reset() { + *x = SubqueueKey{} + if protoimpl.UnsafeEnabled { + mi := &file_temporal_server_api_persistence_v1_tasks_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubqueueKey) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubqueueKey) ProtoMessage() {} + +func (x *SubqueueKey) ProtoReflect() protoreflect.Message { + mi := &file_temporal_server_api_persistence_v1_tasks_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubqueueKey.ProtoReflect.Descriptor instead. +func (*SubqueueKey) Descriptor() ([]byte, []int) { + return file_temporal_server_api_persistence_v1_tasks_proto_rawDescGZIP(), []int{3} +} + +func (x *SubqueueKey) GetPriority() int32 { + if x != nil { + return x.Priority + } + return 0 +} + type TaskKey struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -342,7 +402,7 @@ type TaskKey struct { func (x *TaskKey) Reset() { *x = TaskKey{} if protoimpl.UnsafeEnabled { - mi := &file_temporal_server_api_persistence_v1_tasks_proto_msgTypes[3] + mi := &file_temporal_server_api_persistence_v1_tasks_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -355,7 +415,7 @@ func (x *TaskKey) String() string { func (*TaskKey) ProtoMessage() {} func (x *TaskKey) ProtoReflect() protoreflect.Message { - mi := &file_temporal_server_api_persistence_v1_tasks_proto_msgTypes[3] + mi := &file_temporal_server_api_persistence_v1_tasks_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -368,7 +428,7 @@ func (x *TaskKey) ProtoReflect() protoreflect.Message { // Deprecated: Use TaskKey.ProtoReflect.Descriptor instead. func (*TaskKey) Descriptor() ([]byte, []int) { - return file_temporal_server_api_persistence_v1_tasks_proto_rawDescGZIP(), []int{3} + return file_temporal_server_api_persistence_v1_tasks_proto_rawDescGZIP(), []int{4} } func (x *TaskKey) GetFireTime() *timestamppb.Timestamp { @@ -405,9 +465,9 @@ var file_temporal_server_api_persistence_v1_tasks_proto_rawDesc = []byte{ 0x6f, 0x1a, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x71, 0x75, 0x65, 0x75, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x76, - 0x0a, 0x11, 0x41, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x64, 0x54, 0x61, 0x73, 0x6b, 0x49, - 0x6e, 0x66, 0x6f, 0x12, 0x44, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x2c, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, + 0x0a, 0x11, 0x41, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x64, 0x54, 0x61, 0x73, 0x6b, 0x49, 0x6e, + 0x66, 0x6f, 0x12, 0x44, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x2c, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x70, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x02, 0x68, 0x00, 0x12, 0x1b, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, @@ -422,12 +482,12 @@ var file_temporal_server_api_persistence_v1_tasks_proto_rawDesc = []byte{ 0x12, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x10, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x42, 0x02, 0x68, 0x00, 0x12, 0x3f, 0x0a, 0x0b, - 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x63, 0x72, 0x65, - 0x61, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x3f, 0x0a, 0x0b, 0x65, 0x78, - 0x70, 0x69, 0x72, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x63, 0x72, 0x65, 0x61, + 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x3f, 0x0a, 0x0b, 0x65, 0x78, 0x70, + 0x69, 0x72, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x43, 0x0a, 0x05, 0x63, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, @@ -439,15 +499,15 @@ var file_temporal_server_api_persistence_v1_tasks_proto_rawDesc = []byte{ 0x76, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x74, 0x61, 0x73, 0x6b, 0x71, 0x75, 0x65, 0x75, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x52, 0x10, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x44, - 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x18, 0x0a, 0x05, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x09, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x42, 0x02, 0x68, 0x00, 0x12, 0x40, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, - 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, - 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, - 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x42, - 0x02, 0x68, 0x00, 0x22, 0xbf, 0x03, 0x0a, 0x0d, 0x54, 0x61, 0x73, 0x6b, 0x51, 0x75, 0x65, 0x75, 0x65, - 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x25, 0x0a, 0x0c, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, + 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x18, 0x0a, 0x05, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x18, 0x09, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x42, 0x02, 0x68, 0x00, 0x12, 0x40, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, + 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x2e, + 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x69, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x42, 0x02, + 0x68, 0x00, 0x22, 0x92, 0x04, 0x0a, 0x0d, 0x54, 0x61, 0x73, 0x6b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x49, + 0x6e, 0x66, 0x6f, 0x12, 0x25, 0x0a, 0x0c, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x49, 0x64, 0x42, 0x02, 0x68, 0x00, 0x12, 0x16, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x45, 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, @@ -456,32 +516,40 @@ var file_temporal_server_api_persistence_v1_tasks_proto_rawDesc = []byte{ 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x3c, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x24, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x65, 0x6e, - 0x75, 0x6d, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x51, 0x75, 0x65, 0x75, 0x65, - 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x42, 0x02, 0x68, 0x00, 0x12, 0x1f, 0x0a, - 0x09, 0x61, 0x63, 0x6b, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x08, 0x61, 0x63, 0x6b, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x42, 0x02, 0x68, 0x00, 0x12, 0x3f, 0x0a, 0x0b, - 0x65, 0x78, 0x70, 0x69, 0x72, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x65, 0x78, 0x70, 0x69, - 0x72, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x48, 0x0a, 0x10, 0x6c, 0x61, 0x73, - 0x74, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x6c, 0x61, - 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, + 0x75, 0x6d, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x4b, + 0x69, 0x6e, 0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x42, 0x02, 0x68, 0x00, 0x12, 0x1f, 0x0a, 0x09, + 0x61, 0x63, 0x6b, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, + 0x61, 0x63, 0x6b, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x42, 0x02, 0x68, 0x00, 0x12, 0x3f, 0x0a, 0x0b, 0x65, + 0x78, 0x70, 0x69, 0x72, 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x65, 0x78, 0x70, 0x69, 0x72, + 0x79, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x48, 0x0a, 0x10, 0x6c, 0x61, 0x73, 0x74, + 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x6c, 0x61, 0x73, + 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x3e, 0x0a, 0x19, 0x61, 0x70, 0x70, 0x72, 0x6f, 0x78, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x5f, 0x62, 0x61, 0x63, 0x6b, 0x6c, 0x6f, 0x67, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x17, 0x61, 0x70, 0x70, 0x72, 0x6f, 0x78, 0x69, 0x6d, 0x61, 0x74, 0x65, 0x42, 0x61, 0x63, 0x6b, - 0x6c, 0x6f, 0x67, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x42, 0x02, 0x68, 0x00, 0x22, 0x63, 0x0a, 0x07, 0x54, - 0x61, 0x73, 0x6b, 0x4b, 0x65, 0x79, 0x12, 0x3b, 0x0a, 0x09, 0x66, 0x69, 0x72, 0x65, 0x5f, 0x74, 0x69, - 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x52, 0x08, 0x66, 0x69, 0x72, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, - 0x12, 0x1b, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, - 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x42, 0x02, 0x68, 0x00, 0x42, 0x36, 0x5a, 0x34, 0x67, - 0x6f, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x2e, 0x69, 0x6f, 0x2f, 0x73, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x6e, - 0x63, 0x65, 0x2f, 0x76, 0x31, 0x3b, 0x70, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x63, 0x65, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6c, 0x6f, 0x67, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x42, 0x02, 0x68, 0x00, 0x12, 0x51, 0x0a, 0x09, 0x73, + 0x75, 0x62, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x09, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2f, 0x2e, + 0x74, 0x65, 0x6d, 0x70, 0x6f, 0x72, 0x61, 0x6c, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x61, + 0x70, 0x69, 0x2e, 0x70, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x63, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x53, 0x75, 0x62, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4b, 0x65, 0x79, 0x52, 0x09, 0x73, 0x75, 0x62, + 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x42, 0x02, 0x68, 0x00, 0x22, 0x2d, 0x0a, 0x0b, 0x53, 0x75, 0x62, + 0x71, 0x75, 0x65, 0x75, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x1e, 0x0a, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, + 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, + 0x74, 0x79, 0x42, 0x02, 0x68, 0x00, 0x22, 0x63, 0x0a, 0x07, 0x54, 0x61, 0x73, 0x6b, 0x4b, 0x65, 0x79, + 0x12, 0x3b, 0x0a, 0x09, 0x66, 0x69, 0x72, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x66, 0x69, + 0x72, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x02, 0x68, 0x00, 0x12, 0x1b, 0x0a, 0x07, 0x74, 0x61, 0x73, + 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, + 0x64, 0x42, 0x02, 0x68, 0x00, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x6f, 0x2e, 0x74, 0x65, 0x6d, 0x70, 0x6f, + 0x72, 0x61, 0x6c, 0x2e, 0x69, 0x6f, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x70, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x3b, + 0x70, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -496,36 +564,38 @@ func file_temporal_server_api_persistence_v1_tasks_proto_rawDescGZIP() []byte { return file_temporal_server_api_persistence_v1_tasks_proto_rawDescData } -var file_temporal_server_api_persistence_v1_tasks_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_temporal_server_api_persistence_v1_tasks_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_temporal_server_api_persistence_v1_tasks_proto_goTypes = []interface{}{ (*AllocatedTaskInfo)(nil), // 0: temporal.server.api.persistence.v1.AllocatedTaskInfo (*TaskInfo)(nil), // 1: temporal.server.api.persistence.v1.TaskInfo (*TaskQueueInfo)(nil), // 2: temporal.server.api.persistence.v1.TaskQueueInfo - (*TaskKey)(nil), // 3: temporal.server.api.persistence.v1.TaskKey - (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp - (*v1.VectorClock)(nil), // 5: temporal.server.api.clock.v1.VectorClock - (*v11.TaskVersionDirective)(nil), // 6: temporal.server.api.taskqueue.v1.TaskVersionDirective - (*v12.Priority)(nil), // 7: temporal.api.common.v1.Priority - (v13.TaskQueueType)(0), // 8: temporal.api.enums.v1.TaskQueueType - (v13.TaskQueueKind)(0), // 9: temporal.api.enums.v1.TaskQueueKind + (*SubqueueKey)(nil), // 3: temporal.server.api.persistence.v1.SubqueueKey + (*TaskKey)(nil), // 4: temporal.server.api.persistence.v1.TaskKey + (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp + (*v1.VectorClock)(nil), // 6: temporal.server.api.clock.v1.VectorClock + (*v11.TaskVersionDirective)(nil), // 7: temporal.server.api.taskqueue.v1.TaskVersionDirective + (*v12.Priority)(nil), // 8: temporal.api.common.v1.Priority + (v13.TaskQueueType)(0), // 9: temporal.api.enums.v1.TaskQueueType + (v13.TaskQueueKind)(0), // 10: temporal.api.enums.v1.TaskQueueKind } var file_temporal_server_api_persistence_v1_tasks_proto_depIdxs = []int32{ 1, // 0: temporal.server.api.persistence.v1.AllocatedTaskInfo.data:type_name -> temporal.server.api.persistence.v1.TaskInfo - 4, // 1: temporal.server.api.persistence.v1.TaskInfo.create_time:type_name -> google.protobuf.Timestamp - 4, // 2: temporal.server.api.persistence.v1.TaskInfo.expiry_time:type_name -> google.protobuf.Timestamp - 5, // 3: temporal.server.api.persistence.v1.TaskInfo.clock:type_name -> temporal.server.api.clock.v1.VectorClock - 6, // 4: temporal.server.api.persistence.v1.TaskInfo.version_directive:type_name -> temporal.server.api.taskqueue.v1.TaskVersionDirective - 7, // 5: temporal.server.api.persistence.v1.TaskInfo.priority:type_name -> temporal.api.common.v1.Priority - 8, // 6: temporal.server.api.persistence.v1.TaskQueueInfo.task_type:type_name -> temporal.api.enums.v1.TaskQueueType - 9, // 7: temporal.server.api.persistence.v1.TaskQueueInfo.kind:type_name -> temporal.api.enums.v1.TaskQueueKind - 4, // 8: temporal.server.api.persistence.v1.TaskQueueInfo.expiry_time:type_name -> google.protobuf.Timestamp - 4, // 9: temporal.server.api.persistence.v1.TaskQueueInfo.last_update_time:type_name -> google.protobuf.Timestamp - 4, // 10: temporal.server.api.persistence.v1.TaskKey.fire_time:type_name -> google.protobuf.Timestamp - 11, // [11:11] is the sub-list for method output_type - 11, // [11:11] is the sub-list for method input_type - 11, // [11:11] is the sub-list for extension type_name - 11, // [11:11] is the sub-list for extension extendee - 0, // [0:11] is the sub-list for field type_name + 5, // 1: temporal.server.api.persistence.v1.TaskInfo.create_time:type_name -> google.protobuf.Timestamp + 5, // 2: temporal.server.api.persistence.v1.TaskInfo.expiry_time:type_name -> google.protobuf.Timestamp + 6, // 3: temporal.server.api.persistence.v1.TaskInfo.clock:type_name -> temporal.server.api.clock.v1.VectorClock + 7, // 4: temporal.server.api.persistence.v1.TaskInfo.version_directive:type_name -> temporal.server.api.taskqueue.v1.TaskVersionDirective + 8, // 5: temporal.server.api.persistence.v1.TaskInfo.priority:type_name -> temporal.api.common.v1.Priority + 9, // 6: temporal.server.api.persistence.v1.TaskQueueInfo.task_type:type_name -> temporal.api.enums.v1.TaskQueueType + 10, // 7: temporal.server.api.persistence.v1.TaskQueueInfo.kind:type_name -> temporal.api.enums.v1.TaskQueueKind + 5, // 8: temporal.server.api.persistence.v1.TaskQueueInfo.expiry_time:type_name -> google.protobuf.Timestamp + 5, // 9: temporal.server.api.persistence.v1.TaskQueueInfo.last_update_time:type_name -> google.protobuf.Timestamp + 3, // 10: temporal.server.api.persistence.v1.TaskQueueInfo.subqueues:type_name -> temporal.server.api.persistence.v1.SubqueueKey + 5, // 11: temporal.server.api.persistence.v1.TaskKey.fire_time:type_name -> google.protobuf.Timestamp + 12, // [12:12] is the sub-list for method output_type + 12, // [12:12] is the sub-list for method input_type + 12, // [12:12] is the sub-list for extension type_name + 12, // [12:12] is the sub-list for extension extendee + 0, // [0:12] is the sub-list for field type_name } func init() { file_temporal_server_api_persistence_v1_tasks_proto_init() } @@ -571,6 +641,18 @@ func file_temporal_server_api_persistence_v1_tasks_proto_init() { } } file_temporal_server_api_persistence_v1_tasks_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubqueueKey); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_temporal_server_api_persistence_v1_tasks_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*TaskKey); i { case 0: return &v.state @@ -589,7 +671,7 @@ func file_temporal_server_api_persistence_v1_tasks_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_temporal_server_api_persistence_v1_tasks_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 5, NumExtensions: 0, NumServices: 0, }, diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 5e6d65f9954..1fdde70ee72 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1266,6 +1266,11 @@ these log lines can be noisy, we want to be able to turn on and sample selective false, `Use priority-enabled TaskMatcher.`, ) + MatchingPriorityLevels = NewTaskQueueIntSetting( + "matching.priorityLevels", + 5, + `Number of simple priority levels`, + ) // keys for history diff --git a/proto/internal/temporal/server/api/persistence/v1/tasks.proto b/proto/internal/temporal/server/api/persistence/v1/tasks.proto index 4ed0ecf3269..59f873776b9 100644 --- a/proto/internal/temporal/server/api/persistence/v1/tasks.proto +++ b/proto/internal/temporal/server/api/persistence/v1/tasks.proto @@ -65,6 +65,19 @@ message TaskQueueInfo { google.protobuf.Timestamp expiry_time = 6; google.protobuf.Timestamp last_update_time = 7; int64 approximate_backlog_count = 8; + // Subqueues contains one entry for each subqueue in this physical task queue. + // Tasks are split into subqueues to implement priority and fairness. + // Subqueues are indexed starting from 0, the zero subqueue is always present. + // The message at index n describes the subqueue at index n. + repeated SubqueueKey subqueues = 9; +} + +message SubqueueKey { + // Each subqueue contains tasks from only one priority level. + int32 priority = 1; + + // // Additionally, tasks may be split by a fairness mechanism into buckets. + // int32 fairness_bucket = 2; } message TaskKey { diff --git a/service/matching/backlog_manager.go b/service/matching/backlog_manager.go index a05daf4d2ea..64c34058dc0 100644 --- a/service/matching/backlog_manager.go +++ b/service/matching/backlog_manager.go @@ -63,6 +63,7 @@ type ( backlogManagerImpl struct { pqMgr physicalTaskQueueManager + subqueue int tqCtx context.Context db *taskQueueDB taskWriter *taskWriter @@ -87,6 +88,7 @@ var _ backlogManager = (*backlogManagerImpl)(nil) func newBacklogManager( tqCtx context.Context, pqMgr physicalTaskQueueManager, + subqueue int, config *taskQueueConfig, taskManager persistence.TaskManager, logger log.Logger, @@ -96,6 +98,7 @@ func newBacklogManager( ) *backlogManagerImpl { bmg := &backlogManagerImpl{ pqMgr: pqMgr, + subqueue: subqueue, tqCtx: tqCtx, matchingClient: matchingClient, metricsHandler: metricsHandler, @@ -104,7 +107,11 @@ func newBacklogManager( config: config, initializedError: future.NewFuture[struct{}](), } - bmg.db = newTaskQueueDB(bmg, taskManager, pqMgr.QueueKey(), logger) + sqkey := SubqueueKey{ + PhysicalTaskQueueKey: *pqMgr.QueueKey(), + subqueue: subqueue, + } + bmg.db = newTaskQueueDB(bmg, taskManager, sqkey, logger) bmg.taskWriter = newTaskWriter(bmg) if config.NewMatcher { bmg.priTaskReader = newPriTaskReader(bmg) diff --git a/service/matching/config.go b/service/matching/config.go index 5d3a36574aa..c5f3c28756a 100644 --- a/service/matching/config.go +++ b/service/matching/config.go @@ -32,7 +32,6 @@ import ( "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/dynamicconfig" - "go.temporal.io/server/common/headers" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/tqid" ) @@ -94,6 +93,7 @@ type ( QueryWorkflowTaskTimeoutLogRate dynamicconfig.FloatPropertyFnWithTaskQueueFilter MembershipUnloadDelay dynamicconfig.DurationPropertyFn TaskQueueInfoByBuildIdTTL dynamicconfig.DurationPropertyFnWithTaskQueueFilter + PriorityLevels dynamicconfig.IntPropertyFnWithTaskQueueFilter // Time to hold a poll request before returning an empty response if there are no tasks LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskQueueFilter @@ -132,7 +132,6 @@ type ( taskQueueConfig struct { forwarderConfig - CallerInfo headers.CallerInfo SyncMatchWaitDuration func() time.Duration BacklogNegligibleAge func() time.Duration MaxWaitForPollerBeforeFwd func() time.Duration @@ -146,6 +145,7 @@ type ( MaxTaskQueueIdleTime func() time.Duration MinTaskThrottlingBurstSize func() int MaxTaskDeleteBatchSize func() int + PriorityLevels func() int32 GetUserDataLongPollTimeout dynamicconfig.DurationPropertyFn GetUserDataMinWaitTime time.Duration @@ -267,6 +267,7 @@ func NewConfig( QueryWorkflowTaskTimeoutLogRate: dynamicconfig.MatchingQueryWorkflowTaskTimeoutLogRate.Get(dc), MembershipUnloadDelay: dynamicconfig.MatchingMembershipUnloadDelay.Get(dc), TaskQueueInfoByBuildIdTTL: dynamicconfig.TaskQueueInfoByBuildIdTTL.Get(dc), + PriorityLevels: dynamicconfig.MatchingPriorityLevels.Get(dc), MatchingDropNonRetryableTasks: dynamicconfig.MatchingDropNonRetryableTasks.Get(dc), MaxIDLengthLimit: dynamicconfig.MaxIDLengthLimit.Get(dc), @@ -293,7 +294,6 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) * taskType := tq.TaskType() return &taskQueueConfig{ - CallerInfo: headers.NewBackgroundCallerInfo(ns.String()), RangeSize: config.RangeSize, NewMatcher: config.NewMatcher(ns.String(), taskQueueName, taskType), GetTasksBatchSize: func() int { @@ -324,6 +324,9 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) * MaxTaskDeleteBatchSize: func() int { return config.MaxTaskDeleteBatchSize(ns.String(), taskQueueName, taskType) }, + PriorityLevels: func() int32 { + return int32(config.PriorityLevels(ns.String(), taskQueueName, taskType)) + }, GetUserDataLongPollTimeout: config.GetUserDataLongPollTimeout, GetUserDataMinWaitTime: 1 * time.Second, GetUserDataReturnBudget: returnEmptyTaskTimeBudget, @@ -379,3 +382,7 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) * }, } } + +func defaultPriorityLevel(priorityLevels int32) int32 { + return (priorityLevels + 1) / 2 +} diff --git a/service/matching/db.go b/service/matching/db.go index d7e9170ca79..63af951f349 100644 --- a/service/matching/db.go +++ b/service/matching/db.go @@ -51,17 +51,21 @@ type ( taskQueueDB struct { sync.Mutex backlogMgr *backlogManagerImpl // accessing taskWriter and taskReader - queue *PhysicalTaskQueueKey + queue SubqueueKey rangeID int64 ackLevel int64 store persistence.TaskManager logger log.Logger approximateBacklogCount atomic.Int64 // note that even though this is an atomic, it should only be written to while holding the db lock maxReadLevel atomic.Int64 // note that even though this is an atomic, it should only be written to while holding the db lock + // The contents of this slice should be safe for concurrent read access. That means you + // can append to it or replace it with a new slice, but never mutate existing values. + subqueues []*persistencespb.SubqueueKey } taskQueueState struct { - rangeID int64 - ackLevel int64 + rangeID int64 + ackLevel int64 + subqueues []*persistencespb.SubqueueKey } ) @@ -78,7 +82,7 @@ type ( func newTaskQueueDB( backlogMgr *backlogManagerImpl, store persistence.TaskManager, - queue *PhysicalTaskQueueKey, + queue SubqueueKey, logger log.Logger, ) *taskQueueDB { return &taskQueueDB{ @@ -125,7 +129,11 @@ func (db *taskQueueDB) RenewLease( return taskQueueState{}, err } } - return taskQueueState{rangeID: db.rangeID, ackLevel: db.ackLevel}, nil + return taskQueueState{ + rangeID: db.rangeID, + ackLevel: db.ackLevel, + subqueues: db.subqueues, + }, nil } func (db *taskQueueDB) takeOverTaskQueueLocked( @@ -151,9 +159,15 @@ func (db *taskQueueDB) takeOverTaskQueueLocked( db.ackLevel = response.TaskQueueInfo.AckLevel db.rangeID = response.RangeID + 1 db.approximateBacklogCount.Store(response.TaskQueueInfo.ApproximateBacklogCount) + if db.isSubqueueZero() { + db.subqueues = db.ensureDefaultSubqueuesLocked(response.TaskQueueInfo.Subqueues) + } return nil case *serviceerror.NotFound: + if db.isSubqueueZero() { + db.subqueues = db.ensureDefaultSubqueuesLocked(nil) + } if _, err := db.store.CreateTaskQueue(ctx, &persistence.CreateTaskQueueRequest{ RangeID: initialRangeID, TaskQueueInfo: db.cachedQueueInfo(), @@ -323,6 +337,30 @@ func (db *taskQueueDB) CompleteTasksLessThan( return n, err } +func (db *taskQueueDB) AllocateSubqueue(ctx context.Context, priority int32) ([]*persistencespb.SubqueueKey, error) { + bugIf(!db.isSubqueueZero(), "bug: AllocateSubqueue called on non-zero subqueue") + + db.Lock() + defer db.Unlock() + + prevSubqueues := db.subqueues + db.subqueues = append(db.subqueues, &persistencespb.SubqueueKey{ + Priority: priority, + }) + + // ensure written to metadata before returning + err := db.renewTaskQueueLocked(ctx, db.rangeID) + if err != nil { + // If this was a conflict, caller will shut down partition. Otherwise, we don't know + // for sure if this write made it to persistence or not. We should forget about the new + // subqueue and let a future call to AllocateSubqueue add it again. If we crash and + // reload, the new owner will see the subqueue present, which is fine. + db.subqueues = prevSubqueues + return nil, err + } + return db.subqueues, nil +} + func (db *taskQueueDB) expiryTime() *timestamppb.Timestamp { switch db.queue.Partition().Kind() { case enumspb.TASK_QUEUE_KIND_NORMAL: @@ -344,6 +382,7 @@ func (db *taskQueueDB) cachedQueueInfo() *persistencespb.TaskQueueInfo { ExpiryTime: db.expiryTime(), LastUpdateTime: timestamp.TimeNowPtrUtc(), ApproximateBacklogCount: db.approximateBacklogCount.Load(), + Subqueues: db.subqueues, } } @@ -363,3 +402,24 @@ func (db *taskQueueDB) emitBacklogGauges() { metrics.TaskLagPerTaskQueueGauge.With(db.backlogMgr.metricsHandler).Record(float64(maxReadLevel - db.ackLevel)) } } + +func (db *taskQueueDB) isSubqueueZero() bool { + return db.queue.subqueue == 0 +} + +func (db *taskQueueDB) ensureDefaultSubqueuesLocked(subqueues []*persistencespb.SubqueueKey) []*persistencespb.SubqueueKey { + defPriority := defaultPriorityLevel(db.backlogMgr.config.PriorityLevels()) + hasDefault := false + for _, s := range db.subqueues { + hasDefault = s.Priority == defPriority + if hasDefault { + break + } + } + if !hasDefault { + subqueues = append(subqueues, &persistencespb.SubqueueKey{ + Priority: defPriority, + }) + } + return subqueues +} diff --git a/service/matching/matcher_data.go b/service/matching/matcher_data.go index 361770f4cef..d2ebb744c7a 100644 --- a/service/matching/matcher_data.go +++ b/service/matching/matcher_data.go @@ -111,11 +111,27 @@ func (t *taskPQ) Len() int { // implements heap.Interface, do not call directly func (t *taskPQ) Less(i int, j int) bool { a, b := t.heap[i], t.heap[j] + + // poll forwarder is always last if !a.isPollForwarder && b.isPollForwarder { return true + } else if a.isPollForwarder && !b.isPollForwarder { + return false } - // TODO(pri): use priority, task id, etc. - return false + + // try priority + ap, bp := a.getPriority(), b.getPriority() + apk, bpk := ap.GetPriorityKey(), bp.GetPriorityKey() + if apk < bpk { + return true + } else if apk > bpk { + return false + } + + // Note: sync match tasks have a fixed negative id. + // Query tasks will get 0 here. + aid, bid := a.event.GetTaskId(), b.event.GetTaskId() + return aid < bid } // implements heap.Interface, do not call directly diff --git a/service/matching/physical_task_queue_key.go b/service/matching/physical_task_queue_key.go index 4fdcee17df9..0bd7734e547 100644 --- a/service/matching/physical_task_queue_key.go +++ b/service/matching/physical_task_queue_key.go @@ -62,6 +62,11 @@ type ( // When present, it means this is a V3 pinned queue. deploymentSeriesName string } + + SubqueueKey struct { + PhysicalTaskQueueKey + subqueue int + } ) var ( @@ -210,3 +215,12 @@ func (v PhysicalTaskQueueVersion) MetricsTagValue() string { } return v.deploymentSeriesName + "/" + v.buildId } + +func (q *SubqueueKey) PersistenceName() string { + // FIXME: this may be ambiguous, fix before merging + name := q.PhysicalTaskQueueKey.PersistenceName() + if q.subqueue > 0 { + name += "%" + strconv.Itoa(q.subqueue) + } + return name +} diff --git a/service/matching/physical_task_queue_manager.go b/service/matching/physical_task_queue_manager.go index 1a9d990f30f..255cffe506f 100644 --- a/service/matching/physical_task_queue_manager.go +++ b/service/matching/physical_task_queue_manager.go @@ -76,17 +76,25 @@ type ( taskInfo *persistencespb.TaskInfo forwardInfo *taskqueuespb.TaskForwardInfo } - // physicalTaskQueueManagerImpl manages a single DB-level (aka physical) task queue in memory + // physicalTaskQueueManagerImpl manages a set of physical queues that comprise one logical + // queue, corresponding to a single versioned queue of a task queue partition. + // TODO(pri): rename this physicalTaskQueueManagerImpl struct { status int32 partitionMgr *taskQueuePartitionManagerImpl queue *PhysicalTaskQueueKey config *taskQueueConfig - backlogMgr *backlogManagerImpl + // This context is valid for lifetime of this physicalTaskQueueManagerImpl. // It can be used to notify when the task queue is closing. - tqCtx context.Context - tqCtxCancel context.CancelFunc + tqCtx context.Context + tqCtxCancel context.CancelFunc + + backlogLock sync.Mutex + backlogs []*backlogManagerImpl // backlog managers are 1:1 with subqueues + backlogByPriority map[int32]*backlogManagerImpl + backlog0 *backlogManagerImpl // this is == backlogs[0] but does not require the lock to read + liveness *liveness oldMatcher *TaskMatcher // TODO(pri): old matcher cleanup priMatcher *priTaskMatcher @@ -157,6 +165,7 @@ func newPhysicalTaskQueueManager( config: config, tqCtx: tqCtx, tqCtxCancel: tqCancel, + backlogByPriority: make(map[int32]*backlogManagerImpl), namespaceRegistry: e.namespaceRegistry, matchingClient: e.matchingRawClient, clusterMeta: e.clusterMeta, @@ -181,9 +190,10 @@ func newPhysicalTaskQueueManager( pqMgr.namespaceRegistry, pqMgr.partitionMgr.engine.historyClient, ) - pqMgr.backlogMgr = newBacklogManager( + pqMgr.backlog0 = newBacklogManager( tqCtx, pqMgr, + 0, config, e.taskManager, logger, @@ -191,6 +201,7 @@ func newPhysicalTaskQueueManager( e.matchingRawClient, taggedMetricsHandler, ) + pqMgr.backlogs = []*backlogManagerImpl{pqMgr.backlog0} if config.NewMatcher { var fwdr *priForwarder @@ -232,7 +243,7 @@ func (c *physicalTaskQueueManagerImpl) Start() { return } c.liveness.Start() - c.backlogMgr.Start() + c.backlog0.Start() // this will call LoadSubqueues after initializing c.matcher.Start() c.logger.Info("Started physicalTaskQueueManager", tag.LifeCycleStarted, tag.Cause(c.config.loadCause.String())) c.metricsHandler.Counter(metrics.TaskQueueStartedCounter.Name()).Record(1) @@ -249,8 +260,12 @@ func (c *physicalTaskQueueManagerImpl) Stop(unloadCause unloadCause) { ) { return } - // this may attempt to write one final ack update, do this before canceling tqCtx - c.backlogMgr.Stop() + c.backlogLock.Lock() + for _, b := range c.backlogs { + // this may attempt to write one final ack update, do this before canceling tqCtx + b.Stop() + } + c.backlogLock.Unlock() c.matcher.Stop() c.liveness.Stop() c.tqCtxCancel() @@ -260,12 +275,93 @@ func (c *physicalTaskQueueManagerImpl) Stop(unloadCause unloadCause) { } func (c *physicalTaskQueueManagerImpl) WaitUntilInitialized(ctx context.Context) error { - return c.backlogMgr.WaitUntilInitialized(ctx) + return c.backlog0.WaitUntilInitialized(ctx) +} + +// LoadSubqueues is called once on startup and then again each time the set of subqueues changes. +func (c *physicalTaskQueueManagerImpl) LoadSubqueues(subqueues []*persistencespb.SubqueueKey) { + if !c.config.NewMatcher { + return + } + + c.backlogLock.Lock() + defer c.backlogLock.Unlock() + + c.loadSubqueuesLocked(subqueues) +} + +func (c *physicalTaskQueueManagerImpl) loadSubqueuesLocked(subqueues []*persistencespb.SubqueueKey) { + // TODO(pri): This assumes that subqueues never shrinks, and priority/fairness index of + // existing subqueues never changes. If we change that, this logic will need to change. + for i, s := range subqueues { + if i >= len(c.backlogs) { + b := newBacklogManager( + c.tqCtx, + c, + i, + c.config, + c.partitionMgr.engine.taskManager, + c.logger, + c.throttledLogger, + c.partitionMgr.engine.matchingRawClient, + c.metricsHandler, + ) + b.Start() + c.backlogs = append(c.backlogs, b) + } + c.backlogByPriority[s.Priority] = c.backlogs[i] + } +} + +func (c *physicalTaskQueueManagerImpl) getBacklogForPriority(priority int32) *backlogManagerImpl { + if !c.config.NewMatcher { + return c.backlog0 + } + + levels := c.config.PriorityLevels() + if priority == 0 { + priority = defaultPriorityLevel(levels) + } + if priority < 1 { + // this should have been rejected much earlier, but just clip it here + priority = 1 + } else if priority > int32(levels) { + priority = int32(levels) + } + + c.backlogLock.Lock() + defer c.backlogLock.Unlock() + + if b, ok := c.backlogByPriority[priority]; ok { + return b + } + + // We need to allocate a new subqueue. Note this is doing io under backlogLock, + // but we want to serialize these updates. + // TODO(pri): maybe we can improve that + subqueues, err := c.backlog0.db.AllocateSubqueue(c.tqCtx, priority) + if err != nil { + c.backlog0.signalIfFatal(err) + // If we failed to write the metadata update, just use backlog0. If err was a + // fatal error (most likely case), the subsequent call to SpoolTask will fail. + return c.backlog0 + } + + c.loadSubqueuesLocked(subqueues) + + // this should be here now + if b, ok := c.backlogByPriority[priority]; ok { + return b + } + + // if something went wrong, return 0 + return c.backlog0 } func (c *physicalTaskQueueManagerImpl) SpoolTask(taskInfo *persistencespb.TaskInfo) error { c.liveness.markAlive() - return c.backlogMgr.SpoolTask(taskInfo) + b := c.getBacklogForPriority(taskInfo.Priority.GetPriorityKey()) + return b.SpoolTask(taskInfo) } // PollTask blocks waiting for a task. @@ -332,7 +428,7 @@ func (c *physicalTaskQueueManagerImpl) PollTask( } task.namespace = c.partitionMgr.ns.Name() - task.backlogCountHint = c.backlogMgr.BacklogCountHint + task.backlogCountHint = c.backlogCountHint if pollMetadata.forwardedFrom == "" && // only track the original polls, not forwarded ones. (!task.isStarted() || !task.started.hasEmptyResponse()) { // Need to filter out the empty "started" ones @@ -342,6 +438,16 @@ func (c *physicalTaskQueueManagerImpl) PollTask( } } +func (c *physicalTaskQueueManagerImpl) backlogCountHint() (total int64) { + c.backlogLock.Lock() + defer c.backlogLock.Unlock() + + for _, b := range c.backlogs { + total += b.BacklogCountHint() + } + return +} + func (c *physicalTaskQueueManagerImpl) MarkAlive() { c.liveness.markAlive() } @@ -456,29 +562,45 @@ func (c *physicalTaskQueueManagerImpl) LegacyDescribeTaskQueue(includeTaskQueueS }, } if includeTaskQueueStatus { - response.DescResponse.TaskQueueStatus = c.backlogMgr.BacklogStatus() + // TODO(pri): return only backlog 0, need a new api to include info for all subqueues + response.DescResponse.TaskQueueStatus = c.backlog0.BacklogStatus() response.DescResponse.TaskQueueStatus.RatePerSecond = c.matcher.Rate() } return response } func (c *physicalTaskQueueManagerImpl) GetStats() *taskqueuepb.TaskQueueStats { + c.backlogLock.Lock() + defer c.backlogLock.Unlock() + + var approxCount int64 + var maxAge time.Duration + for _, b := range c.backlogs { + approxCount += b.db.getApproximateBacklogCount() + // using this and not matcher's because it reports only the age of the current physical + // queue backlog (not including the redirected backlogs) which is consistent with the + // ApproximateBacklogCount metric. + maxAge = max(maxAge, b.BacklogHeadAge()) + } return &taskqueuepb.TaskQueueStats{ - ApproximateBacklogCount: c.backlogMgr.db.getApproximateBacklogCount(), - ApproximateBacklogAge: durationpb.New(c.backlogMgr.BacklogHeadAge()), // using this and not matcher's - // because it reports only the age of the current physical queue backlog (not including the redirected backlogs) which is consistent - // with the ApproximateBacklogCount metric. - TasksAddRate: c.tasksAddedInIntervals.rate(), - TasksDispatchRate: c.tasksDispatchedInIntervals.rate(), + ApproximateBacklogCount: approxCount, + ApproximateBacklogAge: durationpb.New(maxAge), + TasksAddRate: c.tasksAddedInIntervals.rate(), + TasksDispatchRate: c.tasksDispatchedInIntervals.rate(), } } func (c *physicalTaskQueueManagerImpl) GetInternalTaskQueueStatus() *taskqueuespb.InternalTaskQueueStatus { + // TODO(pri): return only backlog 0, need a new api to include info for all subqueues + b := c.backlog0 return &taskqueuespb.InternalTaskQueueStatus{ - ReadLevel: c.backlogMgr.taskAckManager.getReadLevel(), - AckLevel: c.backlogMgr.taskAckManager.getAckLevel(), - TaskIdBlock: &taskqueuepb.TaskIdBlock{StartId: c.backlogMgr.taskWriter.taskIDBlock.start, EndId: c.backlogMgr.taskWriter.taskIDBlock.end}, - ReadBufferLength: c.backlogMgr.ReadBufferLength(), + ReadLevel: b.taskAckManager.getReadLevel(), + AckLevel: b.taskAckManager.getAckLevel(), + TaskIdBlock: &taskqueuepb.TaskIdBlock{ + StartId: b.taskWriter.taskIDBlock.start, + EndId: b.taskWriter.taskIDBlock.end, + }, + ReadBufferLength: b.ReadBufferLength(), } } diff --git a/service/matching/physical_task_queue_manager_interface.go b/service/matching/physical_task_queue_manager_interface.go index f3943e7a17d..fcc29882064 100644 --- a/service/matching/physical_task_queue_manager_interface.go +++ b/service/matching/physical_task_queue_manager_interface.go @@ -41,6 +41,7 @@ type ( Start() Stop(unloadCause) WaitUntilInitialized(context.Context) error + LoadSubqueues([]*persistencespb.SubqueueKey) // PollTask blocks waiting for a task Returns error when context deadline is exceeded // maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched // from this task queue to pollers diff --git a/service/matching/pri_task_reader.go b/service/matching/pri_task_reader.go index 80b5c85ac0f..876d5c0fafe 100644 --- a/service/matching/pri_task_reader.go +++ b/service/matching/pri_task_reader.go @@ -195,7 +195,7 @@ Loop: tr.Signal() case <-updateAckTicker.C: - err := tr.persistAckBacklogCountLevel(ctx) + err := tr.persistAckBacklogCountLevel() isConditionFailed := tr.backlogMgr.signalIfFatal(err) if err != nil && !isConditionFailed { tr.logger().Error("Persistent store operation failure", @@ -350,9 +350,9 @@ func (tr *priTaskReader) retryAddAfterError(task *internalTask) { ) } -func (tr *priTaskReader) persistAckBacklogCountLevel(ctx context.Context) error { +func (tr *priTaskReader) persistAckBacklogCountLevel() error { ackLevel := tr.backlogMgr.taskAckManager.getAckLevel() - return tr.backlogMgr.db.UpdateState(ctx, ackLevel) + return tr.backlogMgr.db.UpdateState(tr.backlogMgr.tqCtx, ackLevel) } func (tr *priTaskReader) logger() log.Logger { diff --git a/service/matching/task.go b/service/matching/task.go index da3f7e3d812..7d80ce1f6a0 100644 --- a/service/matching/task.go +++ b/service/matching/task.go @@ -263,6 +263,15 @@ func (task *internalTask) getResponse() (taskResponse, bool) { return <-task.responseC, true } +func (task *internalTask) getPriority() *commonpb.Priority { + if task.event != nil { + return task.event.AllocatedTaskInfo.GetData().GetPriority() + } else if task.query != nil { + return task.query.request.GetPriority() + } + return nil +} + // finish marks a task as finished. Should be called after a poller picks up a task // and marks it as started. If the task is unable to marked as started, then this // method should be called with a non-nil error argument. diff --git a/service/matching/task_writer.go b/service/matching/task_writer.go index 855c3b68a64..4b85527da3e 100644 --- a/service/matching/task_writer.go +++ b/service/matching/task_writer.go @@ -111,6 +111,12 @@ func (w *taskWriter) initReadWriteState() error { w.taskIDBlock = rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize) w.backlogMgr.db.SetMaxReadLevel(w.taskIDBlock.start - 1) w.backlogMgr.taskAckManager.setAckLevel(state.ackLevel) + + // if this is subqueue zero, report initial subqueue metadata + if w.backlogMgr.db.isSubqueueZero() { + w.backlogMgr.pqMgr.LoadSubqueues(state.subqueues) + } + return nil } diff --git a/tests/priority_fairness_test.go b/tests/priority_fairness_test.go new file mode 100644 index 00000000000..9301da1c53e --- /dev/null +++ b/tests/priority_fairness_test.go @@ -0,0 +1,157 @@ +// The MIT License +// +// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tests + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + enumspb "go.temporal.io/api/enums/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/tests/testcore" +) + +type PriorityFairnessSuite struct { + testcore.FunctionalTestSdkSuite +} + +func TestPriorityFairnessSuite(t *testing.T) { + t.Parallel() + suite.Run(t, new(PriorityFairnessSuite)) +} + +func (s *PriorityFairnessSuite) SetupSuite() { + // minimize workflow task backlog to make it easier to debug tests + s.WorkerOptions.MaxConcurrentWorkflowTaskPollers = 20 + + dynamicConfigOverrides := map[dynamicconfig.Key]any{ + dynamicconfig.MatchingUseNewMatcher.Key(): true, + dynamicconfig.MatchingGetTasksBatchSize.Key(): 20, + } + s.FunctionalTestSdkSuite.SetupSuiteWithDefaultCluster(testcore.WithDynamicConfigOverrides(dynamicConfigOverrides)) +} + +func (s *PriorityFairnessSuite) TestPriority_Activity_Basic() { + const N = 100 + const Levels = 5 + actq := testcore.RandomizeStr("actq") + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + var runs []int + + act1 := func(ctx context.Context, wfidx, pri int) error { + runs = append(runs, pri) + // activity.GetLogger(ctx).Info("activity", "pri", pri, "wfidx", wfidx) + return nil + } + + wf1 := func(ctx workflow.Context, wfidx int) error { + var futures []workflow.Future + for _, pri := range rand.Perm(Levels) { + actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + TaskQueue: actq, + ScheduleToCloseTimeout: 60 * time.Second, + Priority: temporal.Priority{PriorityKey: pri + 1}, + }) + f := workflow.ExecuteActivity(actCtx, act1, wfidx, pri+1) + futures = append(futures, f) + } + for _, f := range futures { + s.NoError(f.Get(ctx, nil)) + } + return nil + } + s.Worker().RegisterWorkflow(wf1) + + var execs []client.WorkflowRun + wfopts := client.StartWorkflowOptions{ + TaskQueue: s.TaskQueue(), + } + for wfidx := range N { + exec, err := s.SdkClient().ExecuteWorkflow(ctx, wfopts, wf1, wfidx) + s.NoError(err) + execs = append(execs, exec) + } + + // wait for activity queue to build up backlog + s.T().Log("waiting for backlog") + s.waitForBacklog(ctx, actq, enumspb.TASK_QUEUE_TYPE_ACTIVITY, N*Levels) + + // now let activities run + s.T().Log("starting worker") + actw := worker.New(s.SdkClient(), actq, worker.Options{ + MaxConcurrentActivityExecutionSize: 1, // serialize activities + }) + actw.RegisterActivity(act1) + actw.Start() + defer actw.Stop() + + // wait on all workflows to clean up + for _, exec := range execs { + s.NoError(exec.Get(ctx, nil)) + } + + w := wrongorderness(runs) + s.T().Log("wrongorderness:", w) + s.Less(w, 0.15) +} + +func (s *PriorityFairnessSuite) waitForBacklog(ctx context.Context, tq string, tp enumspb.TaskQueueType, n int) { + s.EventuallyWithT(func(t *assert.CollectT) { + resp, err := s.FrontendClient().DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: tq}, + ApiMode: enumspb.DESCRIBE_TASK_QUEUE_MODE_ENHANCED, + TaskQueueTypes: []enumspb.TaskQueueType{tp}, + ReportStats: true, + }) + assert.NoError(t, err) + stats := resp.GetVersionsInfo()[""].TypesInfo[int32(tp)].Stats + assert.GreaterOrEqual(t, stats.ApproximateBacklogCount, int64(n)) + }, 10*time.Second, 200*time.Millisecond) +} + +func wrongorderness(vs []int) float64 { + l := len(vs) + wrong := 0 + for i, v := range vs[:l-1] { + for _, w := range vs[i+1:] { + if v > w { + wrong++ + } + } + } + return float64(wrong) / float64(l*(l-1)/2) +}