Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport code to drop internal errors encountered during task processing #5385

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,9 @@ const (
ReplicationBypassCorruptedData = "history.ReplicationBypassCorruptedData"
// ReplicationEnableDLQMetrics is the flag to emit DLQ metrics
ReplicationEnableDLQMetrics = "history.ReplicationEnableDLQMetrics"
// HistoryTaskDLQInteralErrors causes history task processing to send tasks failing with serviceerror.Internal to
// the dlq (or will drop them if not enabled)
HistoryTaskDropInternalErrors = "history.TaskDLQInternalErrors"

// ReplicationStreamSyncStatusDuration sync replication status duration
ReplicationStreamSyncStatusDuration = "history.ReplicationStreamSyncStatusDuration"
Expand Down
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,7 @@ var (
)
TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter")
TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover")
TaskInternalErrorCounter = NewCounterDef("task_errors_internal")
TaskThrottledCounter = NewCounterDef(
"task_errors_throttled",
WithDescription("The number of history task processing errors caused by resource exhausted errors, excluding workflow busy case."),
Expand Down
1 change: 1 addition & 0 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
mockMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
err := executable.Execute()
if len(p.ExpectedErrorSubstrings) > 0 {
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type Config struct {
QueueCriticalSlicesCount dynamicconfig.IntPropertyFn
QueuePendingTaskMaxCount dynamicconfig.IntPropertyFn
QueueMaxReaderCount dynamicconfig.IntPropertyFn
TaskDropInternalErrors dynamicconfig.BoolPropertyFn

TaskSchedulerEnableRateLimiter dynamicconfig.BoolPropertyFn
TaskSchedulerEnableRateLimiterShadowMode dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -380,6 +381,7 @@ func NewConfig(
QueueCriticalSlicesCount: dc.GetIntProperty(dynamicconfig.QueueCriticalSlicesCount, 50),
QueuePendingTaskMaxCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskMaxCount, 10000),
QueueMaxReaderCount: dc.GetIntProperty(dynamicconfig.QueueMaxReaderCount, 2),
TaskDropInternalErrors: dc.GetBoolProperty(dynamicconfig.HistoryTaskDropInternalErrors, false),

TaskSchedulerEnableRateLimiter: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiter, false),
TaskSchedulerEnableRateLimiterShadowMode: dc.GetBoolProperty(dynamicconfig.TaskSchedulerEnableRateLimiterShadowMode, true),
Expand Down
14 changes: 14 additions & 0 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -129,6 +130,7 @@ type (
lastActiveness bool
resourceExhaustedCount int // does NOT include consts.ErrResourceExhaustedBusyWorkflow
taggedMetricsHandler metrics.Handler
dropInternalErrors dynamicconfig.BoolPropertyFn
}
)

Expand All @@ -144,7 +146,11 @@ func NewExecutable(
clusterMetadata cluster.Metadata,
logger log.Logger,
metricsHandler metrics.Handler,
dropInternalErrors dynamicconfig.BoolPropertyFn,
) Executable {
if dropInternalErrors == nil {
dropInternalErrors = func() bool { return false }
}
executable := &executableImpl{
Task: task,
state: ctasks.TaskStatePending,
Expand All @@ -166,6 +172,7 @@ func NewExecutable(
),
metricsHandler: metricsHandler,
taggedMetricsHandler: metricsHandler,
dropInternalErrors: dropInternalErrors,
}
executable.updatePriority()
return executable
Expand Down Expand Up @@ -341,6 +348,13 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
e.logger.Error("Drop task due to serialization error", tag.Error(err))
return nil
}
if common.IsInternalError(err) {
e.logger.Error("Encountered internal error processing tasks", tag.Error(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not this log.Error be inside the if e.dropInternalErrors()? if you are not returning you log the error anyway in line 361.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Looks like we missed this in my patch to v1.23 as well. I'll follow up with 1.23 to fix that when I next have a chance

Copy link
Member

@yycptt yycptt Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think idea here is that even when dropInternalErrors is false, we still log and emit metric for the internal errors so that we can be confident that they are safe to drop. and then enable the flag. It's a shadow mode basically.

Having this log outside dropInternalErrors makes it easier to filter logs and only look at internal error logs. With only the log on L361, I don't think there's a good way to audit just the internal errors. The Error() method of an InternalError doesn't say it's an internal error 🤦.

We can add extra tags, if double logging is a concern. The volume of InternalError should be ~0 though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a tags.ErrorType tag to the main log message here. That'd give us a single log that handles both cases.

I'm going to do so in a separate PR and port over the code from https://github.com/temporalio/temporal/pull/5234/files#diff-3045f1928c46472037eb67e844c90f8745e01cd321e058ba212e3ea1a6b5147fR51 to unwrap things like fmt.wrapErr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put up a pr at #5400

e.taggedMetricsHandler.Counter(metrics.TaskInternalErrorCounter.GetMetricName()).Record(1)
if e.dropInternalErrors() {
return nil
}
}

e.taggedMetricsHandler.Counter(metrics.TaskFailures.GetMetricName()).Record(1)

Expand Down
43 changes: 42 additions & 1 deletion service/history/queues/executable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
Expand All @@ -65,6 +66,11 @@ type (

timeSource *clock.EventTimeSource
}

params struct {
dropInternalErrors dynamicconfig.BoolPropertyFn
}
option func(*params)
)

func TestExecutableSuite(t *testing.T) {
Expand Down Expand Up @@ -297,6 +303,34 @@ func (s *executableSuite) TestExecuteHandleErr_Corrupted() {
s.NoError(executable.HandleErr(err))
}

func (s *executableSuite) TestExecute_DropsInternalErrors_WhenEnabled() {
executable := s.newTestExecutable(func(p *params) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call it dropErrorsExecutable

p.dropInternalErrors = func() bool { return true }
})

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn(
func(_ context.Context, _ Executable) ([]metrics.Tag, bool, error) {
panic(serviceerror.NewInternal("injected error"))
},
)

s.NoError(executable.HandleErr(executable.Execute()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would introduce variable for executable.HandleErr result. This way action will be separated from assertion.

}

func (s *executableSuite) TestExecute_DoesntDropInternalErrors_WhenDisabled() {
executable := s.newTestExecutable(func(p *params) {
p.dropInternalErrors = func() bool { return false }
})

s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).DoAndReturn(
func(_ context.Context, _ Executable) ([]metrics.Tag, bool, error) {
panic(serviceerror.NewInternal("injected error"))
},
)

s.Error(executable.HandleErr(executable.Execute()))
}

func (s *executableSuite) TestHandleErr_EntityNotExists() {
executable := s.newTestExecutable()

Expand Down Expand Up @@ -408,7 +442,13 @@ func (s *executableSuite) TestTaskCancellation() {
s.False(executable.IsRetryableError(errors.New("some random error")))
}

func (s *executableSuite) newTestExecutable() Executable {
func (s *executableSuite) newTestExecutable(opts ...option) Executable {
p := params{
dropInternalErrors: func() bool { return false },
}
for _, opt := range opts {
opt(&p)
}
return NewExecutable(
DefaultReaderId,
tasks.NewFakeTask(
Expand All @@ -429,5 +469,6 @@ func (s *executableSuite) newTestExecutable() Executable {
s.mockClusterMetadata,
log.NewTestLogger(),
metrics.NoopMetricsHandler,
p.dropInternalErrors,
)
}
1 change: 1 addition & 0 deletions service/history/queues/memory_scheduled_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (s *memoryScheduledQueueSuite) newSpeculativeWorkflowTaskTimeoutTestExecuta
nil,
nil,
nil,
func() bool { return false },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is "drop internal errors", right?

),
wttt,
)
Expand Down
1 change: 1 addition & 0 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func newQueueBase(
shard.GetClusterMetadata(),
logger,
metricsHandler,
shard.GetConfig().TaskDropInternalErrors,
)
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *readerSuite) SetupTest() {
s.metricsHandler = metrics.NoopMetricsHandler

s.executableInitializer = func(readerID int64, t tasks.Task) Executable {
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler)
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler, func() bool { return false })
}
s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{
PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000),
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *sliceSuite) SetupTest() {
s.controller = gomock.NewController(s.T())

s.executableInitializer = func(readerID int64, t tasks.Task) Executable {
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler)
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, nil, metrics.NoopMetricsHandler, func() bool { return false })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can create a function doDropInternalErrors or something like this.

}
s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{
PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (q SpeculativeWorkflowTaskTimeoutQueue) NotifyNewTasks(ts []tasks.Task) {
q.clusterMetadata,
q.logger,
q.metricsHandler,
func() bool { return false },
), wttt)
q.timeoutQueue.Add(executable)
}
Expand Down
1 change: 1 addition & 0 deletions service/history/timer_queue_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1610,5 +1610,6 @@ func (s *timerQueueActiveTaskExecutorSuite) newTaskExecutable(
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1505,5 +1505,6 @@ func (s *timerQueueStandbyTaskExecutorSuite) newTaskExecutable(
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2805,5 +2805,6 @@ func (s *transferQueueActiveTaskExecutorSuite) newTaskExecutable(
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1269,5 +1269,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) newTaskExecutable(
s.mockClusterMetadata,
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}
1 change: 1 addition & 0 deletions service/history/visibility_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,5 +617,6 @@ func (s *visibilityQueueTaskExecutorSuite) newTaskExecutable(
s.mockShard.GetClusterMetadata(),
nil,
metrics.NoopMetricsHandler,
func() bool { return false },
)
}
Loading