From 26f873f6dd2e8c6eb889d6753d6e81c3a664e32b Mon Sep 17 00:00:00 2001 From: yux0 Date: Mon, 3 Feb 2025 14:27:31 -0800 Subject: [PATCH 1/4] Wait for namespace handover update --- common/dynamicconfig/constants.go | 5 + common/metrics/metric_defs.go | 1 + common/rpc/interceptor/namespace_handover.go | 144 ++++++++++++++++++ common/rpc/interceptor/namespace_validator.go | 46 +++--- common/util.go | 6 +- service/frontend/fx.go | 19 +++ service/frontend/service.go | 2 + 7 files changed, 197 insertions(+), 26 deletions(-) create mode 100644 common/rpc/interceptor/namespace_handover.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index f6d2b280ec0..98b0cad922e 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -152,6 +152,11 @@ values in system search attributes.`, `EnableNamespaceNotActiveAutoForwarding whether enabling DC auto forwarding to active cluster for signal / start / signal with start API if namespace is not active`, ) + EnableNamespaceHandoverWait = NewNamespaceBoolSetting( + "system.enableNamespaceHandoverWait", + true, + `EnableNamespaceHandoverWait whether waiting for namespace replication state update before serve the request`, + ) TransactionSizeLimit = NewGlobalIntSetting( "system.transactionSizeLimit", primitives.DefaultTransactionSizeLimit, diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index 7360e81c825..6b5f15d6cea 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -700,6 +700,7 @@ var ( ) HostRPSLimit = NewGaugeDef("host_rps_limit") NamespaceHostRPSLimit = NewGaugeDef("namespace_host_rps_limit") + HandoverWaitLatency = NewTimerDef("handover_wait_latency") // History CacheRequests = NewCounterDef("cache_requests") diff --git a/common/rpc/interceptor/namespace_handover.go b/common/rpc/interceptor/namespace_handover.go new file mode 100644 index 00000000000..181e1e6fc05 --- /dev/null +++ b/common/rpc/interceptor/namespace_handover.go @@ -0,0 +1,144 @@ +package interceptor + +import ( + "context" + "strings" + "time" + + "github.com/google/uuid" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/server/common" + "go.temporal.io/server/common/api" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "google.golang.org/grpc" +) + +const ( + ctxTailRoom = time.Millisecond * 100 +) + +var _ grpc.UnaryServerInterceptor = (*NamespaceHandoverInterceptor)(nil).Intercept + +type ( + // NamespaceHandoverInterceptor handles the namespace in handover replication state + NamespaceHandoverInterceptor struct { + namespaceRegistry namespace.Registry + timeSource clock.TimeSource + enabledForNS dynamicconfig.BoolPropertyFnWithNamespaceFilter + metricsHandler metrics.Handler + logger log.Logger + } +) + +func NewNamespaceHandoverInterceptor( + enabledForNS dynamicconfig.BoolPropertyFnWithNamespaceFilter, + namespaceRegistry namespace.Registry, + metricsHandler metrics.Handler, + logger log.Logger, + timeSource clock.TimeSource, +) *NamespaceHandoverInterceptor { + + return &NamespaceHandoverInterceptor{ + enabledForNS: enabledForNS, + namespaceRegistry: namespaceRegistry, + metricsHandler: metricsHandler, + logger: logger, + timeSource: timeSource, + } +} + +func (i *NamespaceHandoverInterceptor) Intercept( + ctx context.Context, + req any, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (_ any, retError error) { + defer log.CapturePanic(i.logger, &retError) + + if !strings.HasPrefix(info.FullMethod, api.WorkflowServicePrefix) { + return handler(ctx, req) + } + + // review which method is allowed + methodName := api.MethodName(info.FullMethod) + namespaceName, err := GetNamespaceName(i.namespaceRegistry, req) + if err != nil { + return nil, err + } + + if i.enabledForNS(namespaceName.String()) { + startTime := i.timeSource.Now() + defer func() { + metrics.HandoverWaitLatency.With(i.metricsHandler).Record(time.Since(startTime)) + }() + _, err = i.waitNamespaceHandoverUpdate(ctx, namespaceName, methodName) + if err != nil { + return nil, err + } + } + + return handler(ctx, req) +} + +func (i *NamespaceHandoverInterceptor) waitNamespaceHandoverUpdate( + ctx context.Context, + namespaceName namespace.Name, + methodName string, +) (waitTime time.Duration, retErr error) { + if _, ok := allowedMethodsDuringHandover[methodName]; ok { + return + } + + startTime := time.Now() + namespaceData, err := i.namespaceRegistry.GetNamespace(namespaceName) + if err != nil { + return 0, err + } + if namespaceData.ReplicationState() == enumspb.REPLICATION_STATE_HANDOVER { + cbID := uuid.New() + waitReplicationStateUpdate := make(chan struct{}) + i.namespaceRegistry.RegisterStateChangeCallback(cbID, func(ns *namespace.Namespace, deletedFromDb bool) { + if ns.ID().String() != namespaceData.ID().String() { + return + } + if ns.State() != enumspb.NAMESPACE_STATE_REGISTERED || + deletedFromDb || + ns.ReplicationState() != enumspb.REPLICATION_STATE_HANDOVER || + !ns.IsGlobalNamespace() { + // Stop wait on state change if: + // 1. namespace is deleting/deleted + // 2. namespace is not in handover + // 3. namespace is not global + select { + case <-waitReplicationStateUpdate: + default: + close(waitReplicationStateUpdate) + } + } + }) + + childCtx := context.Background() + if deadline, ok := ctx.Deadline(); ok { + var childCancelFn context.CancelFunc + childCtx, childCancelFn = context.WithDeadline(ctx, deadline.Add(-ctxTailRoom)) + defer func() { + childCancelFn() + }() + } + select { + case <-childCtx.Done(): + err = common.ErrNamespaceHandover + case <-waitReplicationStateUpdate: + } + i.namespaceRegistry.UnregisterStateChangeCallback(cbID) + if err != nil { + // error? + return time.Since(startTime), err + } + } + return time.Since(startTime), nil +} diff --git a/common/rpc/interceptor/namespace_validator.go b/common/rpc/interceptor/namespace_validator.go index eea16a7a623..5e9f490a2e0 100644 --- a/common/rpc/interceptor/namespace_validator.go +++ b/common/rpc/interceptor/namespace_validator.go @@ -77,31 +77,27 @@ var ( allowedNamespaceStatesDefault = []enumspb.NamespaceState{enumspb.NAMESPACE_STATE_REGISTERED, enumspb.NAMESPACE_STATE_DEPRECATED} allowedMethodsDuringHandover = map[string]struct{}{ - "DescribeNamespace": {}, - "UpdateNamespace": {}, - "GetReplicationMessages": {}, - "ReplicateEventsV2": {}, - "GetWorkflowExecutionRawHistory": {}, - "GetWorkflowExecutionRawHistoryV2": {}, - "GetWorkflowExecutionHistory": {}, - "GetWorkflowExecutionHistoryReverse": {}, - "DescribeWorkflowExecution": {}, - "DescribeTaskQueue": {}, - "ListTaskQueuePartitions": {}, - "ListOpenWorkflowExecutions": {}, - "ListClosedWorkflowExecutions": {}, - "ListWorkflowExecutions": {}, - "ListArchivedWorkflowExecutions": {}, - "ScanWorkflowExecutions": {}, - "CountWorkflowExecutions": {}, - "DescribeSchedule": {}, - "ListScheduleMatchingTimes": {}, - "ListSchedules": {}, - "GetWorkerBuildIdCompatibility": {}, - "GetWorkerVersioningRules": {}, - "GetWorkerTaskReachability": {}, - "DescribeBatchOperation": {}, - "ListBatchOperations": {}, + // Namespace APIs + "DeprecateNamespace": {}, + "DescribeNamespace": {}, + "UpdateNamespace": {}, + "ListNamespaces": {}, + "RegisterNamespace": {}, + // Replication APIs + "GetReplicationMessages": {}, + "ReplicateEventsV2": {}, + "GetWorkflowExecutionRawHistory": {}, + "GetWorkflowExecutionRawHistoryV2": {}, + // Visibility APIs + "ListTaskQueuePartitions": {}, + "ListOpenWorkflowExecutions": {}, + "ListClosedWorkflowExecutions": {}, + "ListWorkflowExecutions": {}, + "ListArchivedWorkflowExecutions": {}, + "ScanWorkflowExecutions": {}, + "CountWorkflowExecutions": {}, + "ListSchedules": {}, + "ListBatchOperations": {}, } ) diff --git a/common/util.go b/common/util.go index 03b7ed32ae3..95107c3c89f 100644 --- a/common/util.go +++ b/common/util.go @@ -338,7 +338,7 @@ func IsServiceClientTransientError(err error) bool { } func IsServiceHandlerRetryableError(err error) bool { - if err.Error() == ErrNamespaceHandover.Error() { + if IsNamespaceHandoverError(err) { return false } @@ -357,6 +357,10 @@ func IsServiceHandlerRetryableError(err error) bool { return false } +func IsNamespaceHandoverError(err error) bool { + return err.Error() == ErrNamespaceHandover.Error() +} + func IsStickyWorkerUnavailable(err error) bool { switch err.(type) { case *serviceerrors.StickyWorkerUnavailable: diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 49ade9d59ee..7bf331a92d0 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -98,6 +98,7 @@ var Module = fx.Options( fx.Provide(MuxRouterProvider), fx.Provide(ConfigProvider), fx.Provide(NamespaceLogInterceptorProvider), + fx.Provide(NamespaceHandoverInterceptorProvider), fx.Provide(RedirectionInterceptorProvider), fx.Provide(TelemetryInterceptorProvider), fx.Provide(RetryableInterceptorProvider), @@ -217,6 +218,7 @@ func GrpcServerOptionsProvider( namespaceRateLimiterInterceptor *interceptor.NamespaceRateLimitInterceptor, namespaceCountLimiterInterceptor *interceptor.ConcurrentRequestLimitInterceptor, namespaceValidatorInterceptor *interceptor.NamespaceValidatorInterceptor, + namespaceHandoverInterceptor *interceptor.NamespaceHandoverInterceptor, redirectionInterceptor *interceptor.Redirection, telemetryInterceptor *interceptor.TelemetryInterceptor, retryableInterceptor *interceptor.RetryableInterceptor, @@ -267,6 +269,7 @@ func GrpcServerOptionsProvider( namespaceLogInterceptor.Intercept, // TODO: Deprecate this with a outer custom interceptor metrics.NewServerMetricsContextInjectorInterceptor(), authInterceptor.Intercept, + namespaceHandoverInterceptor.Intercept, redirectionInterceptor.Intercept, telemetryInterceptor.UnaryIntercept, healthInterceptor.Intercept, @@ -353,6 +356,22 @@ func RedirectionInterceptorProvider( ) } +func NamespaceHandoverInterceptorProvider( + configuration *Config, + namespaceCache namespace.Registry, + logger log.Logger, + metricsHandler metrics.Handler, + timeSource clock.TimeSource, +) *interceptor.NamespaceHandoverInterceptor { + return interceptor.NewNamespaceHandoverInterceptor( + configuration.EnableNamespaceHandoverWait, + namespaceCache, + metricsHandler, + logger, + timeSource, + ) +} + func TelemetryInterceptorProvider( logger log.Logger, metricsHandler metrics.Handler, diff --git a/service/frontend/service.go b/service/frontend/service.go index cadde504dbe..cb86a9aa4f4 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -120,6 +120,7 @@ type Config struct { // Namespace specific config EnableNamespaceNotActiveAutoForwarding dynamicconfig.BoolPropertyFnWithNamespaceFilter + EnableNamespaceHandoverWait dynamicconfig.BoolPropertyFnWithNamespaceFilter SearchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter SearchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -293,6 +294,7 @@ func NewConfig( ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc), ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc), EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc), + EnableNamespaceHandoverWait: dynamicconfig.EnableNamespaceHandoverWait.Get(dc), SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc), SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc), SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc), From 656d66a0a3efdd2a904fece2c55a9cfacb8c757f Mon Sep 17 00:00:00 2001 From: yux0 Date: Wed, 19 Feb 2025 15:59:41 -0800 Subject: [PATCH 2/4] clean up code --- common/rpc/interceptor/namespace_handover.go | 8 +++----- service/frontend/fx.go | 2 ++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/common/rpc/interceptor/namespace_handover.go b/common/rpc/interceptor/namespace_handover.go index 181e1e6fc05..191d5b67c6a 100644 --- a/common/rpc/interceptor/namespace_handover.go +++ b/common/rpc/interceptor/namespace_handover.go @@ -129,16 +129,14 @@ func (i *NamespaceHandoverInterceptor) waitNamespaceHandoverUpdate( childCancelFn() }() } + var handoverErr error select { case <-childCtx.Done(): - err = common.ErrNamespaceHandover + handoverErr = common.ErrNamespaceHandover case <-waitReplicationStateUpdate: } i.namespaceRegistry.UnregisterStateChangeCallback(cbID) - if err != nil { - // error? - return time.Since(startTime), err - } + return time.Since(startTime), handoverErr } return time.Since(startTime), nil } diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 7bf331a92d0..3416cb867c0 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -269,6 +269,8 @@ func GrpcServerOptionsProvider( namespaceLogInterceptor.Intercept, // TODO: Deprecate this with a outer custom interceptor metrics.NewServerMetricsContextInjectorInterceptor(), authInterceptor.Intercept, + // Handover interceptor has to above redirection because the request will route to the correct cluster after handover completed. + // And retry cannot be performed before customInterceptors. namespaceHandoverInterceptor.Intercept, redirectionInterceptor.Intercept, telemetryInterceptor.UnaryIntercept, From f60b2b036400346d527a9db210bba4f830b4feef Mon Sep 17 00:00:00 2001 From: yux0 Date: Wed, 19 Feb 2025 16:10:17 -0800 Subject: [PATCH 3/4] Update metric --- common/dynamicconfig/constants.go | 2 +- common/rpc/interceptor/namespace_handover.go | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 98b0cad922e..6a5a27c80a1 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -154,7 +154,7 @@ for signal / start / signal with start API if namespace is not active`, ) EnableNamespaceHandoverWait = NewNamespaceBoolSetting( "system.enableNamespaceHandoverWait", - true, + false, `EnableNamespaceHandoverWait whether waiting for namespace replication state update before serve the request`, ) TransactionSizeLimit = NewGlobalIntSetting( diff --git a/common/rpc/interceptor/namespace_handover.go b/common/rpc/interceptor/namespace_handover.go index 191d5b67c6a..b76edfb50d1 100644 --- a/common/rpc/interceptor/namespace_handover.go +++ b/common/rpc/interceptor/namespace_handover.go @@ -71,11 +71,13 @@ func (i *NamespaceHandoverInterceptor) Intercept( } if i.enabledForNS(namespaceName.String()) { - startTime := i.timeSource.Now() + var waitTime *time.Duration defer func() { - metrics.HandoverWaitLatency.With(i.metricsHandler).Record(time.Since(startTime)) + if waitTime != nil { + metrics.HandoverWaitLatency.With(i.metricsHandler).Record(*waitTime) + } }() - _, err = i.waitNamespaceHandoverUpdate(ctx, namespaceName, methodName) + waitTime, err = i.waitNamespaceHandoverUpdate(ctx, namespaceName, methodName) if err != nil { return nil, err } @@ -88,15 +90,15 @@ func (i *NamespaceHandoverInterceptor) waitNamespaceHandoverUpdate( ctx context.Context, namespaceName namespace.Name, methodName string, -) (waitTime time.Duration, retErr error) { +) (waitTime *time.Duration, retErr error) { if _, ok := allowedMethodsDuringHandover[methodName]; ok { return } - startTime := time.Now() + startTime := i.timeSource.Now() namespaceData, err := i.namespaceRegistry.GetNamespace(namespaceName) if err != nil { - return 0, err + return nil, err } if namespaceData.ReplicationState() == enumspb.REPLICATION_STATE_HANDOVER { cbID := uuid.New() @@ -136,7 +138,8 @@ func (i *NamespaceHandoverInterceptor) waitNamespaceHandoverUpdate( case <-waitReplicationStateUpdate: } i.namespaceRegistry.UnregisterStateChangeCallback(cbID) - return time.Since(startTime), handoverErr + waitTime := time.Since(startTime) + return &waitTime, handoverErr } - return time.Since(startTime), nil + return nil, nil } From 557c48cbf09f1081d34565b89047cb990a25ed15 Mon Sep 17 00:00:00 2001 From: yux0 Date: Thu, 20 Feb 2025 10:43:36 -0800 Subject: [PATCH 4/4] update --- common/dynamicconfig/constants.go | 2 +- common/rpc/interceptor/namespace_handover.go | 45 +++++++++---------- common/rpc/interceptor/namespace_validator.go | 6 +++ service/frontend/fx.go | 4 +- 4 files changed, 30 insertions(+), 27 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 6a5a27c80a1..98b0cad922e 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -154,7 +154,7 @@ for signal / start / signal with start API if namespace is not active`, ) EnableNamespaceHandoverWait = NewNamespaceBoolSetting( "system.enableNamespaceHandoverWait", - false, + true, `EnableNamespaceHandoverWait whether waiting for namespace replication state update before serve the request`, ) TransactionSizeLimit = NewGlobalIntSetting( diff --git a/common/rpc/interceptor/namespace_handover.go b/common/rpc/interceptor/namespace_handover.go index b76edfb50d1..2b06c87d1a9 100644 --- a/common/rpc/interceptor/namespace_handover.go +++ b/common/rpc/interceptor/namespace_handover.go @@ -26,16 +26,17 @@ var _ grpc.UnaryServerInterceptor = (*NamespaceHandoverInterceptor)(nil).Interce type ( // NamespaceHandoverInterceptor handles the namespace in handover replication state NamespaceHandoverInterceptor struct { - namespaceRegistry namespace.Registry - timeSource clock.TimeSource - enabledForNS dynamicconfig.BoolPropertyFnWithNamespaceFilter - metricsHandler metrics.Handler - logger log.Logger + namespaceRegistry namespace.Registry + timeSource clock.TimeSource + enabledForNS dynamicconfig.BoolPropertyFnWithNamespaceFilter + nsCacheRefreshInterval dynamicconfig.DurationPropertyFn + metricsHandler metrics.Handler + logger log.Logger } ) func NewNamespaceHandoverInterceptor( - enabledForNS dynamicconfig.BoolPropertyFnWithNamespaceFilter, + dc *dynamicconfig.Collection, namespaceRegistry namespace.Registry, metricsHandler metrics.Handler, logger log.Logger, @@ -43,11 +44,12 @@ func NewNamespaceHandoverInterceptor( ) *NamespaceHandoverInterceptor { return &NamespaceHandoverInterceptor{ - enabledForNS: enabledForNS, - namespaceRegistry: namespaceRegistry, - metricsHandler: metricsHandler, - logger: logger, - timeSource: timeSource, + enabledForNS: dynamicconfig.EnableNamespaceHandoverWait.Get(dc), + nsCacheRefreshInterval: dynamicconfig.NamespaceCacheRefreshInterval.Get(dc), + namespaceRegistry: namespaceRegistry, + metricsHandler: metricsHandler, + logger: logger, + timeSource: timeSource, } } @@ -65,19 +67,16 @@ func (i *NamespaceHandoverInterceptor) Intercept( // review which method is allowed methodName := api.MethodName(info.FullMethod) - namespaceName, err := GetNamespaceName(i.namespaceRegistry, req) - if err != nil { - return nil, err - } + namespaceName := MustGetNamespaceName(i.namespaceRegistry, req) - if i.enabledForNS(namespaceName.String()) { + if namespaceName != namespace.EmptyName && i.enabledForNS(namespaceName.String()) { var waitTime *time.Duration defer func() { if waitTime != nil { metrics.HandoverWaitLatency.With(i.metricsHandler).Record(*waitTime) } }() - waitTime, err = i.waitNamespaceHandoverUpdate(ctx, namespaceName, methodName) + waitTime, err := i.waitNamespaceHandoverUpdate(ctx, namespaceName, methodName) if err != nil { return nil, err } @@ -123,19 +122,17 @@ func (i *NamespaceHandoverInterceptor) waitNamespaceHandoverUpdate( } }) - childCtx := context.Background() + maxWaitDuration := i.nsCacheRefreshInterval() // cache refresh time if deadline, ok := ctx.Deadline(); ok { - var childCancelFn context.CancelFunc - childCtx, childCancelFn = context.WithDeadline(ctx, deadline.Add(-ctxTailRoom)) - defer func() { - childCancelFn() - }() + maxWaitDuration = max(0, time.Until(deadline)-ctxTailRoom) } + returnTimer := time.NewTimer(maxWaitDuration) var handoverErr error select { - case <-childCtx.Done(): + case <-returnTimer.C: handoverErr = common.ErrNamespaceHandover case <-waitReplicationStateUpdate: + returnTimer.Stop() } i.namespaceRegistry.UnregisterStateChangeCallback(cbID) waitTime := time.Since(startTime) diff --git a/common/rpc/interceptor/namespace_validator.go b/common/rpc/interceptor/namespace_validator.go index 5e9f490a2e0..79dd1178f54 100644 --- a/common/rpc/interceptor/namespace_validator.go +++ b/common/rpc/interceptor/namespace_validator.go @@ -77,6 +77,10 @@ var ( allowedNamespaceStatesDefault = []enumspb.NamespaceState{enumspb.NAMESPACE_STATE_REGISTERED, enumspb.NAMESPACE_STATE_DEPRECATED} allowedMethodsDuringHandover = map[string]struct{}{ + // System + "GetSystemInfo": {}, + "GetSearchAttributes": {}, + "GetClusterInfo": {}, // Namespace APIs "DeprecateNamespace": {}, "DescribeNamespace": {}, @@ -98,6 +102,8 @@ var ( "CountWorkflowExecutions": {}, "ListSchedules": {}, "ListBatchOperations": {}, + // Matching + "ShutdownWorker": {}, } ) diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 3416cb867c0..eda9336337e 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -359,14 +359,14 @@ func RedirectionInterceptorProvider( } func NamespaceHandoverInterceptorProvider( - configuration *Config, + dc *dynamicconfig.Collection, namespaceCache namespace.Registry, logger log.Logger, metricsHandler metrics.Handler, timeSource clock.TimeSource, ) *interceptor.NamespaceHandoverInterceptor { return interceptor.NewNamespaceHandoverInterceptor( - configuration.EnableNamespaceHandoverWait, + dc, namespaceCache, metricsHandler, logger,