Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for handover state update before serving the requests #7028

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ values in system search attributes.`,
`EnableNamespaceNotActiveAutoForwarding whether enabling DC auto forwarding to active cluster
for signal / start / signal with start API if namespace is not active`,
)
EnableNamespaceHandoverWait = NewNamespaceBoolSetting(
"system.enableNamespaceHandoverWait",
true,
`EnableNamespaceHandoverWait whether waiting for namespace replication state update before serve the request`,
)
TransactionSizeLimit = NewGlobalIntSetting(
"system.transactionSizeLimit",
primitives.DefaultTransactionSizeLimit,
Expand Down
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ var (
)
HostRPSLimit = NewGaugeDef("host_rps_limit")
NamespaceHostRPSLimit = NewGaugeDef("namespace_host_rps_limit")
HandoverWaitLatency = NewTimerDef("handover_wait_latency")

// History
CacheRequests = NewCounterDef("cache_requests")
Expand Down
142 changes: 142 additions & 0 deletions common/rpc/interceptor/namespace_handover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package interceptor

import (
"context"
"strings"
"time"

"github.com/google/uuid"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/api"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"google.golang.org/grpc"
)

const (
ctxTailRoom = time.Millisecond * 100
)

var _ grpc.UnaryServerInterceptor = (*NamespaceHandoverInterceptor)(nil).Intercept

type (
// NamespaceHandoverInterceptor handles the namespace in handover replication state
NamespaceHandoverInterceptor struct {
namespaceRegistry namespace.Registry
timeSource clock.TimeSource
enabledForNS dynamicconfig.BoolPropertyFnWithNamespaceFilter
nsCacheRefreshInterval dynamicconfig.DurationPropertyFn
metricsHandler metrics.Handler
logger log.Logger
}
)

func NewNamespaceHandoverInterceptor(
dc *dynamicconfig.Collection,
namespaceRegistry namespace.Registry,
metricsHandler metrics.Handler,
logger log.Logger,
timeSource clock.TimeSource,
) *NamespaceHandoverInterceptor {

return &NamespaceHandoverInterceptor{
enabledForNS: dynamicconfig.EnableNamespaceHandoverWait.Get(dc),
nsCacheRefreshInterval: dynamicconfig.NamespaceCacheRefreshInterval.Get(dc),
namespaceRegistry: namespaceRegistry,
metricsHandler: metricsHandler,
logger: logger,
timeSource: timeSource,
}
}

func (i *NamespaceHandoverInterceptor) Intercept(
ctx context.Context,
req any,
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (_ any, retError error) {
defer log.CapturePanic(i.logger, &retError)

if !strings.HasPrefix(info.FullMethod, api.WorkflowServicePrefix) {
return handler(ctx, req)
}

// review which method is allowed
methodName := api.MethodName(info.FullMethod)
namespaceName := MustGetNamespaceName(i.namespaceRegistry, req)

if namespaceName != namespace.EmptyName && i.enabledForNS(namespaceName.String()) {
var waitTime *time.Duration
defer func() {
if waitTime != nil {
metrics.HandoverWaitLatency.With(i.metricsHandler).Record(*waitTime)
}
}()
waitTime, err := i.waitNamespaceHandoverUpdate(ctx, namespaceName, methodName)
if err != nil {
return nil, err
}
}

return handler(ctx, req)
}

func (i *NamespaceHandoverInterceptor) waitNamespaceHandoverUpdate(
ctx context.Context,
namespaceName namespace.Name,
methodName string,
) (waitTime *time.Duration, retErr error) {
if _, ok := allowedMethodsDuringHandover[methodName]; ok {
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: returning nil, nil is surprising I think for the caller.

}

startTime := i.timeSource.Now()
namespaceData, err := i.namespaceRegistry.GetNamespace(namespaceName)
if err != nil {
return nil, err
}
if namespaceData.ReplicationState() == enumspb.REPLICATION_STATE_HANDOVER {
cbID := uuid.New()
waitReplicationStateUpdate := make(chan struct{})
i.namespaceRegistry.RegisterStateChangeCallback(cbID, func(ns *namespace.Namespace, deletedFromDb bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably should be change the impl to listen on changes for a specific namespace...
Can be done in a later PR.

if ns.ID().String() != namespaceData.ID().String() {
return
}
if ns.State() != enumspb.NAMESPACE_STATE_REGISTERED ||
deletedFromDb ||
ns.ReplicationState() != enumspb.REPLICATION_STATE_HANDOVER ||
!ns.IsGlobalNamespace() {
// Stop wait on state change if:
// 1. namespace is deleting/deleted
// 2. namespace is not in handover
// 3. namespace is not global
select {
case <-waitReplicationStateUpdate:
default:
close(waitReplicationStateUpdate)
}
}
})

maxWaitDuration := i.nsCacheRefreshInterval() // cache refresh time
if deadline, ok := ctx.Deadline(); ok {
maxWaitDuration = max(0, time.Until(deadline)-ctxTailRoom)
}
returnTimer := time.NewTimer(maxWaitDuration)
var handoverErr error
select {
case <-returnTimer.C:
handoverErr = common.ErrNamespaceHandover
case <-waitReplicationStateUpdate:
returnTimer.Stop()
}
i.namespaceRegistry.UnregisterStateChangeCallback(cbID)
waitTime := time.Since(startTime)
return &waitTime, handoverErr
}
return nil, nil
}
52 changes: 27 additions & 25 deletions common/rpc/interceptor/namespace_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,33 @@ var (
allowedNamespaceStatesDefault = []enumspb.NamespaceState{enumspb.NAMESPACE_STATE_REGISTERED, enumspb.NAMESPACE_STATE_DEPRECATED}

allowedMethodsDuringHandover = map[string]struct{}{
"DescribeNamespace": {},
"UpdateNamespace": {},
"GetReplicationMessages": {},
"ReplicateEventsV2": {},
"GetWorkflowExecutionRawHistory": {},
"GetWorkflowExecutionRawHistoryV2": {},
"GetWorkflowExecutionHistory": {},
"GetWorkflowExecutionHistoryReverse": {},
"DescribeWorkflowExecution": {},
"DescribeTaskQueue": {},
"ListTaskQueuePartitions": {},
"ListOpenWorkflowExecutions": {},
"ListClosedWorkflowExecutions": {},
"ListWorkflowExecutions": {},
"ListArchivedWorkflowExecutions": {},
"ScanWorkflowExecutions": {},
"CountWorkflowExecutions": {},
"DescribeSchedule": {},
"ListScheduleMatchingTimes": {},
"ListSchedules": {},
"GetWorkerBuildIdCompatibility": {},
"GetWorkerVersioningRules": {},
"GetWorkerTaskReachability": {},
"DescribeBatchOperation": {},
"ListBatchOperations": {},
// System
"GetSystemInfo": {},
"GetSearchAttributes": {},
"GetClusterInfo": {},
// Namespace APIs
"DeprecateNamespace": {},
"DescribeNamespace": {},
"UpdateNamespace": {},
"ListNamespaces": {},
"RegisterNamespace": {},
// Replication APIs
"GetReplicationMessages": {},
"ReplicateEventsV2": {},
"GetWorkflowExecutionRawHistory": {},
"GetWorkflowExecutionRawHistoryV2": {},
// Visibility APIs
"ListTaskQueuePartitions": {},
"ListOpenWorkflowExecutions": {},
"ListClosedWorkflowExecutions": {},
"ListWorkflowExecutions": {},
"ListArchivedWorkflowExecutions": {},
"ScanWorkflowExecutions": {},
"CountWorkflowExecutions": {},
"ListSchedules": {},
"ListBatchOperations": {},
// Matching
"ShutdownWorker": {},
}
)

Expand Down
6 changes: 5 additions & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func IsServiceClientTransientError(err error) bool {
}

func IsServiceHandlerRetryableError(err error) bool {
if err.Error() == ErrNamespaceHandover.Error() {
if IsNamespaceHandoverError(err) {
return false
}

Expand All @@ -357,6 +357,10 @@ func IsServiceHandlerRetryableError(err error) bool {
return false
}

func IsNamespaceHandoverError(err error) bool {
return err.Error() == ErrNamespaceHandover.Error()
}

func IsStickyWorkerUnavailable(err error) bool {
switch err.(type) {
case *serviceerrors.StickyWorkerUnavailable:
Expand Down
21 changes: 21 additions & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ var Module = fx.Options(
fx.Provide(MuxRouterProvider),
fx.Provide(ConfigProvider),
fx.Provide(NamespaceLogInterceptorProvider),
fx.Provide(NamespaceHandoverInterceptorProvider),
fx.Provide(RedirectionInterceptorProvider),
fx.Provide(TelemetryInterceptorProvider),
fx.Provide(RetryableInterceptorProvider),
Expand Down Expand Up @@ -217,6 +218,7 @@ func GrpcServerOptionsProvider(
namespaceRateLimiterInterceptor *interceptor.NamespaceRateLimitInterceptor,
namespaceCountLimiterInterceptor *interceptor.ConcurrentRequestLimitInterceptor,
namespaceValidatorInterceptor *interceptor.NamespaceValidatorInterceptor,
namespaceHandoverInterceptor *interceptor.NamespaceHandoverInterceptor,
redirectionInterceptor *interceptor.Redirection,
telemetryInterceptor *interceptor.TelemetryInterceptor,
retryableInterceptor *interceptor.RetryableInterceptor,
Expand Down Expand Up @@ -267,6 +269,9 @@ func GrpcServerOptionsProvider(
namespaceLogInterceptor.Intercept, // TODO: Deprecate this with a outer custom interceptor
metrics.NewServerMetricsContextInjectorInterceptor(),
authInterceptor.Intercept,
// Handover interceptor has to above redirection because the request will route to the correct cluster after handover completed.
// And retry cannot be performed before customInterceptors.
namespaceHandoverInterceptor.Intercept,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments explaining why it must be in this place?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the handover state checking logic in StateValidationIntercept removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not remove in this PR as I added the feature flag. Once we can confident we can remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to disabled that the check in StateValidationIntercept if the feature flag is enabled?

redirectionInterceptor.Intercept,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess theoretically we can retry in redirection Interceptor? since it has the knowledge of whether redirect will actually happen or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess so but the logic could be weird. What should we do if the redirect will not happen?

telemetryInterceptor.UnaryIntercept,
healthInterceptor.Intercept,
Expand Down Expand Up @@ -353,6 +358,22 @@ func RedirectionInterceptorProvider(
)
}

func NamespaceHandoverInterceptorProvider(
dc *dynamicconfig.Collection,
namespaceCache namespace.Registry,
logger log.Logger,
metricsHandler metrics.Handler,
timeSource clock.TimeSource,
) *interceptor.NamespaceHandoverInterceptor {
return interceptor.NewNamespaceHandoverInterceptor(
dc,
namespaceCache,
metricsHandler,
logger,
timeSource,
)
}

func TelemetryInterceptorProvider(
logger log.Logger,
metricsHandler metrics.Handler,
Expand Down
2 changes: 2 additions & 0 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type Config struct {

// Namespace specific config
EnableNamespaceNotActiveAutoForwarding dynamicconfig.BoolPropertyFnWithNamespaceFilter
EnableNamespaceHandoverWait dynamicconfig.BoolPropertyFnWithNamespaceFilter

SearchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
SearchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -293,6 +294,7 @@ func NewConfig(
ShutdownDrainDuration: dynamicconfig.FrontendShutdownDrainDuration.Get(dc),
ShutdownFailHealthCheckDuration: dynamicconfig.FrontendShutdownFailHealthCheckDuration.Get(dc),
EnableNamespaceNotActiveAutoForwarding: dynamicconfig.EnableNamespaceNotActiveAutoForwarding.Get(dc),
EnableNamespaceHandoverWait: dynamicconfig.EnableNamespaceHandoverWait.Get(dc),
SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc),
SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc),
SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc),
Expand Down