Skip to content

Commit

Permalink
Use drop instead of DLQ everywhere but the public config value
Browse files Browse the repository at this point in the history
This is more accurate for the behavior in 1.22.x but I want to use the
same DC property
  • Loading branch information
tdeebswihart committed Feb 1, 2024
1 parent 81e9e98 commit 58b6e6f
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 20 deletions.
2 changes: 1 addition & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ const (
ReplicationEnableDLQMetrics = "history.ReplicationEnableDLQMetrics"
// HistoryTaskDLQInteralErrors causes history task processing to send tasks failing with serviceerror.Internal to
// the dlq (or will drop them if not enabled)
HistoryTaskDLQInternalErrors = "history.TaskDLQInternalErrors"
HistoryTaskDropInternalErrors = "history.TaskDLQInternalErrors"

// ReplicationStreamSyncStatusDuration sync replication status duration
ReplicationStreamSyncStatusDuration = "history.ReplicationStreamSyncStatusDuration"
Expand Down
2 changes: 1 addition & 1 deletion service/history/archival_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
CheckpointInterval: f.Config.ArchivalProcessorUpdateAckInterval,
CheckpointIntervalJitterCoefficient: f.Config.ArchivalProcessorUpdateAckIntervalJitterCoefficient,
MaxReaderCount: f.Config.QueueMaxReaderCount,
DLQInternalErrors: f.Config.TaskDLQInternalErrors,
DropInternalErrors: f.Config.TaskDropInternalErrors,
},
f.HostReaderRateLimiter,
logger,
Expand Down
4 changes: 2 additions & 2 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type Config struct {
QueueCriticalSlicesCount dynamicconfig.IntPropertyFn
QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn
QueueMaxReaderCount dynamicconfig.IntPropertyFn
TaskDLQInternalErrors dynamicconfig.BoolPropertyFn
TaskDropInternalErrors dynamicconfig.BoolPropertyFn

TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn
TaskSchedulerEnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -381,7 +381,7 @@ func NewConfig(
QueueCriticalSlicesCount: dc.GetIntProperty(dynamicconfig.QueueCriticalSlicesCount, 50),
QueuePendingTaskMaxCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskMaxCount, 10000),
QueueMaxReaderCount: dc.GetIntProperty(dynamicconfig.QueueMaxReaderCount, 2),
TaskDLQInternalErrors: dc.GetBoolProperty(dynamicconfig.HistoryTaskDLQInternalErrors, false),
TaskDropInternalErrors: dc.GetBoolProperty(dynamicconfig.HistoryTaskDropInternalErrors, false),

TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter, false),
TaskSchedulerEnableRateLimiterShadowMode: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiterShadowMode, true),
Expand Down
12 changes: 6 additions & 6 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type (
lastActiveness bool
resourceExhaustedCount int // does NOT include consts.ErrResourceExhaustedBusyWorkflow
taggedMetricsHandler metrics.Handler
dlqInternalErrors dynamicconfig.BoolPropertyFn
dropInternalErrors dynamicconfig.BoolPropertyFn
}
)

Expand All @@ -146,10 +146,10 @@ func NewExecutable(
clusterMetadata cluster.Metadata,
logger log.Logger,
metricsHandler metrics.Handler,
dlqInternalErrors dynamicconfig.BoolPropertyFn,
dropInternalErrors dynamicconfig.BoolPropertyFn,
) Executable {
if dlqInternalErrors == nil {
dlqInternalErrors = func() bool { return false }
if dropInternalErrors == nil {
dropInternalErrors = func() bool { return false }
}
executable := &executableImpl{
Task: task,
Expand All @@ -172,7 +172,7 @@ func NewExecutable(
),
metricsHandler: metricsHandler,
taggedMetricsHandler: metricsHandler,
dlqInternalErrors: dlqInternalErrors,
dropInternalErrors: dropInternalErrors,
}
executable.updatePriority()
return executable
Expand Down Expand Up @@ -351,7 +351,7 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
if common.IsInternalError(err) {
e.logger.Error("Encountered internal error processing tasks", tag.Error(err))
e.taggedMetricsHandler.Counter(metrics.TaskInternalErrorCounter.GetMetricName()).Record(1)
if e.dlqInternalErrors() {
if e.dropInternalErrors() {
return nil
}
}
Expand Down
10 changes: 5 additions & 5 deletions service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type (
}

params struct {
dlqInternalErrors dynamicconfig.BoolPropertyFn
dropInternalErrors dynamicconfig.BoolPropertyFn
}
option func(*params)
)
Expand Down Expand Up @@ -305,7 +305,7 @@ func (s *executableSuite) TestExecuteHandleErr_Corrupted() {

func (s *executableSuite) TestExecute_DropsInternalErrors_WhenEnabled() {
executable := s.newTestExecutable(func(p *params) {
p.dlqInternalErrors = func() bool { return true }
p.dropInternalErrors = func() bool { return true }
})

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn(
Expand All @@ -319,7 +319,7 @@ func (s *executableSuite) TestExecute_DropsInternalErrors_WhenEnabled() {

func (s *executableSuite) TestExecute_DoesntDropInternalErrors_WhenDisabled() {
executable := s.newTestExecutable(func(p *params) {
p.dlqInternalErrors = func() bool { return false }
p.dropInternalErrors = func() bool { return false }
})

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn(
Expand Down Expand Up @@ -444,7 +444,7 @@ func (s *executableSuite) TestTaskCancellation() {

func (s *executableSuite) newTestExecutable(opts ...option) Executable {
p := params{
dlqInternalErrors: func() bool { return false },
dropInternalErrors: func() bool { return false },
}
for _, opt := range opts {
opt(&p)
Expand All @@ -469,6 +469,6 @@ func (s *executableSuite) newTestExecutable(opts ...option) Executable {
s.mockClusterMetadata,
log.NewTestLogger(),
metrics.NoopMetricsHandler,
p.dlqInternalErrors,
p.dropInternalErrors,
)
}
4 changes: 2 additions & 2 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type (
CheckpointInterval dynamicconfig.DurationPropertyFn
CheckpointIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
MaxReaderCount dynamicconfig.IntPropertyFn
DLQInternalErrors dynamicconfig.BoolPropertyFn
DropInternalErrors dynamicconfig.BoolPropertyFn
}
)

Expand Down Expand Up @@ -169,7 +169,7 @@ func newQueueBase(
shard.GetClusterMetadata(),
logger,
metricsHandler,
options.DLQInternalErrors,
options.DropInternalErrors,
)
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/timer_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (f *timerQueueFactory) CreateQueue(
CheckpointInterval: f.Config.TimerProcessorUpdateAckInterval,
CheckpointIntervalJitterCoefficient: f.Config.TimerProcessorUpdateAckIntervalJitterCoefficient,
MaxReaderCount: f.Config.QueueMaxReaderCount,
DLQInternalErrors: f.Config.TaskDLQInternalErrors,
DropInternalErrors: f.Config.TaskDropInternalErrors,
},
f.HostReaderRateLimiter,
logger,
Expand Down
2 changes: 1 addition & 1 deletion service/history/transfer_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (f *transferQueueFactory) CreateQueue(
CheckpointInterval: f.Config.TransferProcessorUpdateAckInterval,
CheckpointIntervalJitterCoefficient: f.Config.TransferProcessorUpdateAckIntervalJitterCoefficient,
MaxReaderCount: f.Config.QueueMaxReaderCount,
DLQInternalErrors: f.Config.TaskDLQInternalErrors,
DropInternalErrors: f.Config.TaskDropInternalErrors,
},
f.HostReaderRateLimiter,
logger,
Expand Down
2 changes: 1 addition & 1 deletion service/history/visibility_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (f *visibilityQueueFactory) CreateQueue(
CheckpointInterval: f.Config.VisibilityProcessorUpdateAckInterval,
CheckpointIntervalJitterCoefficient: f.Config.VisibilityProcessorUpdateAckIntervalJitterCoefficient,
MaxReaderCount: f.Config.QueueMaxReaderCount,
DLQInternalErrors: f.Config.TaskDLQInternalErrors,
DropInternalErrors: f.Config.TaskDropInternalErrors,
},
f.HostReaderRateLimiter,
logger,
Expand Down

0 comments on commit 58b6e6f

Please sign in to comment.