From b4953a34d67864f8adc035bbed5f253d70909b42 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 28 Jan 2025 15:30:41 -0800 Subject: [PATCH] Add timeout for dynamodb ring kv (#6544) * add dynamodb kv with timeout enforced Signed-off-by: yeya24 * add tests Signed-off-by: yeya24 * docs Signed-off-by: Ben Ye * update changelog Signed-off-by: Ben Ye --------- Signed-off-by: yeya24 Signed-off-by: Ben Ye --- docs/blocks-storage/compactor.md | 4 ++ docs/blocks-storage/store-gateway.md | 4 ++ docs/configuration/config-file-reference.md | 28 ++++++++ pkg/ring/kv/dynamodb/client.go | 9 ++- pkg/ring/kv/dynamodb/client_test.go | 77 +++++++++++++++++++++ pkg/ring/kv/dynamodb/dynamodb.go | 39 +++++++++++ 6 files changed, 160 insertions(+), 1 deletion(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 1cfc53ec5c..15f8fe7383 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -225,6 +225,10 @@ compactor: # CLI flag: -compactor.ring.dynamodb.max-cas-retries [max_cas_retries: | default = 10] + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -compactor.ring.dynamodb.timeout + [timeout: | default = 2m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: compactor.ring [consul: ] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index fbd3c92af2..31005f0eae 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -240,6 +240,10 @@ store_gateway: # CLI flag: -store-gateway.sharding-ring.dynamodb.max-cas-retries [max_cas_retries: | default = 10] + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -store-gateway.sharding-ring.dynamodb.timeout + [timeout: | default = 2m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: # store-gateway.sharding-ring diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 43948773a5..e850a511b5 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -341,6 +341,10 @@ sharding_ring: # CLI flag: -alertmanager.sharding-ring.dynamodb.max-cas-retries [max_cas_retries: | default = 10] + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -alertmanager.sharding-ring.dynamodb.timeout + [timeout: | default = 2m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: alertmanager.sharding-ring [consul: ] @@ -2286,6 +2290,10 @@ sharding_ring: # CLI flag: -compactor.ring.dynamodb.max-cas-retries [max_cas_retries: | default = 10] + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -compactor.ring.dynamodb.timeout + [timeout: | default = 2m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: compactor.ring [consul: ] @@ -2595,6 +2603,10 @@ ha_tracker: # CLI flag: -distributor.ha-tracker.dynamodb.max-cas-retries [max_cas_retries: | default = 10] + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -distributor.ha-tracker.dynamodb.timeout + [timeout: | default = 2m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: distributor.ha-tracker [consul: ] @@ -2689,6 +2701,10 @@ ring: # CLI flag: -distributor.ring.dynamodb.max-cas-retries [max_cas_retries: | default = 10] + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -distributor.ring.dynamodb.timeout + [timeout: | default = 2m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: distributor.ring [consul: ] @@ -3017,6 +3033,10 @@ lifecycler: # CLI flag: -dynamodb.max-cas-retries [max_cas_retries: | default = 10] + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -dynamodb.timeout + [timeout: | default = 2m] + # The consul_config configures the consul client. [consul: ] @@ -4674,6 +4694,10 @@ ring: # CLI flag: -ruler.ring.dynamodb.max-cas-retries [max_cas_retries: | default = 10] + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -ruler.ring.dynamodb.timeout + [timeout: | default = 2m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: ruler.ring [consul: ] @@ -5665,6 +5689,10 @@ sharding_ring: # CLI flag: -store-gateway.sharding-ring.dynamodb.max-cas-retries [max_cas_retries: | default = 10] + # Timeout of dynamoDbClient requests. Default is 2m. + # CLI flag: -store-gateway.sharding-ring.dynamodb.timeout + [timeout: | default = 2m] + # The consul_config configures the consul client. # The CLI flags prefix for this block config is: store-gateway.sharding-ring [consul: ] diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index 71de47f0e5..0fb53294d1 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -26,6 +26,7 @@ type Config struct { TTL time.Duration `yaml:"ttl"` PullerSyncTime time.Duration `yaml:"puller_sync_time"` MaxCasRetries int `yaml:"max_cas_retries"` + Timeout time.Duration `yaml:"timeout"` } type Client struct { @@ -53,6 +54,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) { f.DurationVar(&cfg.TTL, prefix+"dynamodb.ttl-time", 0, "Time to expire items on dynamodb.") f.DurationVar(&cfg.PullerSyncTime, prefix+"dynamodb.puller-sync-time", 60*time.Second, "Time to refresh local ring with information on dynamodb.") f.IntVar(&cfg.MaxCasRetries, prefix+"dynamodb.max-cas-retries", maxCasRetries, "Maximum number of retries for DDB KV CAS.") + f.DurationVar(&cfg.Timeout, prefix+"dynamodb.timeout", 2*time.Minute, "Timeout of dynamoDbClient requests. Default is 2m.") } func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) { @@ -69,8 +71,13 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh MaxRetries: cfg.MaxCasRetries, } + var kv dynamoDbClient + kv = dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics} + if cfg.Timeout > 0 { + kv = newDynamodbKVWithTimeout(kv, cfg.Timeout) + } c := &Client{ - kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics}, + kv: kv, codec: cc, logger: ddbLog(logger), ddbMetrics: ddbMetrics, diff --git a/pkg/ring/kv/dynamodb/client_test.go b/pkg/ring/kv/dynamodb/client_test.go index 666e2cf311..7cefe64f75 100644 --- a/pkg/ring/kv/dynamodb/client_test.go +++ b/pkg/ring/kv/dynamodb/client_test.go @@ -302,6 +302,29 @@ func Test_UpdateStaleData(t *testing.T) { } +func Test_DynamodbKVWithTimeout(t *testing.T) { + ddbMock := NewDynamodbClientMock() + // Backend has delay of 5s while the client timeout is 1s. + ddbWithDelay := newDynamodbKVWithDelay(ddbMock, time.Second*5) + dbWithTimeout := newDynamodbKVWithTimeout(ddbWithDelay, time.Second) + + ctx := context.Background() + _, _, err := dbWithTimeout.List(ctx, dynamodbKey{primaryKey: key}) + require.True(t, errors.Is(err, context.DeadlineExceeded)) + + err = dbWithTimeout.Delete(ctx, dynamodbKey{primaryKey: key}) + require.True(t, errors.Is(err, context.DeadlineExceeded)) + + _, _, err = dbWithTimeout.Query(ctx, dynamodbKey{primaryKey: key}, true) + require.True(t, errors.Is(err, context.DeadlineExceeded)) + + err = dbWithTimeout.Put(ctx, dynamodbKey{primaryKey: key}, []byte{}) + require.True(t, errors.Is(err, context.DeadlineExceeded)) + + err = dbWithTimeout.Batch(ctx, nil, nil) + require.True(t, errors.Is(err, context.DeadlineExceeded)) +} + // NewClientMock makes a new local dynamodb client. func NewClientMock(ddbClient dynamoDbClient, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer, time time.Duration, config backoff.Config) *Client { return &Client{ @@ -429,3 +452,57 @@ func (m *DescMock) FindDifference(that codec.MultiKey) (interface{}, []string, e } return args.Get(0), args.Get(1).([]string), err } + +type dynamodbKVWithDelayAndContextCheck struct { + ddbClient dynamoDbClient + delay time.Duration +} + +func newDynamodbKVWithDelay(client dynamoDbClient, delay time.Duration) *dynamodbKVWithDelayAndContextCheck { + return &dynamodbKVWithDelayAndContextCheck{ddbClient: client, delay: delay} +} + +func (d *dynamodbKVWithDelayAndContextCheck) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) { + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + case <-time.After(d.delay): + return d.ddbClient.List(ctx, key) + } +} + +func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + case <-time.After(d.delay): + return d.ddbClient.Query(ctx, key, isPrefix) + } +} + +func (d *dynamodbKVWithDelayAndContextCheck) Delete(ctx context.Context, key dynamodbKey) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(d.delay): + return d.ddbClient.Delete(ctx, key) + } +} + +func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamodbKey, data []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(d.delay): + return d.ddbClient.Put(ctx, key, data) + } +} + +func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(d.delay): + return d.ddbClient.Batch(ctx, put, delete) + } +} diff --git a/pkg/ring/kv/dynamodb/dynamodb.go b/pkg/ring/kv/dynamodb/dynamodb.go index f54e5fe55b..2dc4769d6e 100644 --- a/pkg/ring/kv/dynamodb/dynamodb.go +++ b/pkg/ring/kv/dynamodb/dynamodb.go @@ -259,6 +259,45 @@ func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[st return item } +type dynamodbKVWithTimeout struct { + ddbClient dynamoDbClient + timeout time.Duration +} + +func newDynamodbKVWithTimeout(client dynamoDbClient, timeout time.Duration) *dynamodbKVWithTimeout { + return &dynamodbKVWithTimeout{ddbClient: client, timeout: timeout} +} + +func (d *dynamodbKVWithTimeout) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.List(ctx, key) +} + +func (d *dynamodbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.Query(ctx, key, isPrefix) +} + +func (d *dynamodbKVWithTimeout) Delete(ctx context.Context, key dynamodbKey) error { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.Delete(ctx, key) +} + +func (d *dynamodbKVWithTimeout) Put(ctx context.Context, key dynamodbKey, data []byte) error { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.Put(ctx, key, data) +} + +func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error { + ctx, cancel := context.WithTimeout(ctx, d.timeout) + defer cancel() + return d.ddbClient.Batch(ctx, put, delete) +} + func generateItemKey(key dynamodbKey) map[string]*dynamodb.AttributeValue { resp := map[string]*dynamodb.AttributeValue{ primaryKey: {