From dc98f49b2a3d5c86f7fc5f22ba9a18e701a21f48 Mon Sep 17 00:00:00 2001 From: itaiad200 Date: Sat, 18 May 2024 21:09:14 +0300 Subject: [PATCH] Make MaxBatchDelay configurable (#7774) * Make MaxBatchDelay configurable * fix --- docs/reference/configuration.md | 3 ++- pkg/catalog/catalog.go | 1 + pkg/config/config.go | 1 + pkg/config/defaults.go | 9 +++++++++ pkg/graveler/ref/manager.go | 21 ++++++++------------- 5 files changed, 21 insertions(+), 14 deletions(-) diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md index 7a87b30fbb3..fa1ebb277ff 100644 --- a/docs/reference/configuration.md +++ b/docs/reference/configuration.md @@ -162,7 +162,8 @@ This reference uses `.` to denote the nesting of values. * `graveler.commit_cache.size` `(int : 50000)` - How many items to store in the commit cache. * `graveler.commit_cache.ttl` `(time duration : "10m")` - How long to store an item in the commit cache. * `graveler.commit_cache.jitter` `(time duration : "2s")` - A random amount of time between 0 and this value is added to each item's TTL. -* `graveler.background.rate_limit` `(int : 0)` - Advence configuration to control background work done rate limit in requests per second (default: 0 - unlimited). +* `graveler.max_batch_delay` `(duration : 3ms)` - Controls the server batching period for references store operations. +* `graveler.background.rate_limit` `(int : 0)` - Requests per seconds limit on background work performed (default: 0 - unlimited), like deleting committed staging tokens. * `committed.local_cache` - an object describing the local (on-disk) cache of metadata from permanent storage: + `committed.local_cache.size_bytes` (`int` : `1073741824`) - bytes for local cache to use on disk. The cache may use more storage for short periods of time. diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 54f01682af5..0b33fa1b305 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -366,6 +366,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { AddressProvider: addressProvider, RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache), CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache), + MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay, }) gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, cfg.Config.Committed.BlockStoragePrefix) settingManager := settings.NewManager(refManager, cfg.KVStore) diff --git a/pkg/config/config.go b/pkg/config/config.go index 7115d09feb0..c8ec7021498 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -321,6 +321,7 @@ type Config struct { Background struct { RateLimit int `mapstructure:"rate_limit"` } `mapstructure:"background"` + MaxBatchDelay time.Duration `mapstructure:"max_batch_delay"` } `mapstructure:"graveler"` Gateways struct { S3 struct { diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 4d625faa315..4da130025ce 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -137,6 +137,15 @@ func setDefaults(cfgType string) { viper.SetDefault("graveler.commit_cache.expiry", 10*time.Minute) viper.SetDefault("graveler.commit_cache.jitter", 2*time.Second) + // MaxBatchDelay - 3ms was chosen as a max delay time for critical path queries. + // It trades off amount of queries per second (and thus effectiveness of the batching mechanism) with added latency. + // Since reducing # of expensive operations is only beneficial when there are a lot of concurrent requests, + // + // the sweet spot is probably between 1-5 milliseconds (representing 200-1000 requests/second to the data store). + // + // 3ms of delay with ~300 requests/second per resource sounds like a reasonable tradeoff. + viper.SetDefault("graveler.max_batch_delay", 3*time.Millisecond) + viper.SetDefault("ugc.prepare_interval", time.Minute) viper.SetDefault("ugc.prepare_max_file_size", 20*1024*1024) diff --git a/pkg/graveler/ref/manager.go b/pkg/graveler/ref/manager.go index e5832c2e07b..8c13018b24c 100644 --- a/pkg/graveler/ref/manager.go +++ b/pkg/graveler/ref/manager.go @@ -17,14 +17,6 @@ import ( ) const ( - // MaxBatchDelay - 3ms was chosen as a max delay time for critical path queries. - // It trades off amount of queries per second (and thus effectiveness of the batching mechanism) with added latency. - // Since reducing # of expensive operations is only beneficial when there are a lot of concurrent requests, - // - // the sweet spot is probably between 1-5 milliseconds (representing 200-1000 requests/second to the data store). - // - // 3ms of delay with ~300 requests/second per resource sounds like a reasonable tradeoff. - MaxBatchDelay = 3 * time.Millisecond // commitIDStringLength string representation length of commit ID - based on hex representation of sha256 commitIDStringLength = 64 // ImportExpiryTime Expiry time to remove imports from ref-store @@ -44,6 +36,7 @@ type Manager struct { batchExecutor batch.Batcher repoCache cache.Cache commitCache cache.Cache + maxBatchDelay time.Duration } func branchFromProto(pb *graveler.BranchData) *graveler.Branch { @@ -80,6 +73,7 @@ type ManagerConfig struct { AddressProvider ident.AddressProvider RepositoryCacheConfig CacheConfig CommitCacheConfig CacheConfig + MaxBatchDelay time.Duration } func NewRefManager(cfg ManagerConfig) *Manager { @@ -90,6 +84,7 @@ func NewRefManager(cfg ManagerConfig) *Manager { batchExecutor: cfg.Executor, repoCache: newCache(cfg.RepositoryCacheConfig), commitCache: newCache(cfg.CommitCacheConfig), + maxBatchDelay: cfg.MaxBatchDelay, } } @@ -107,7 +102,7 @@ func (m *Manager) getRepository(ctx context.Context, repositoryID graveler.Repos func (m *Manager) getRepositoryBatch(ctx context.Context, repositoryID graveler.RepositoryID) (*graveler.RepositoryRecord, error) { key := fmt.Sprintf("GetRepository:%s", repositoryID) - repository, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { + repository, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { return m.getRepository(context.Background(), repositoryID) })) if err != nil { @@ -359,7 +354,7 @@ func (m *Manager) getBranchWithPredicate(ctx context.Context, repository *gravel *graveler.Branch kv.Predicate } - result, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { + result, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { key := graveler.BranchPath(branchID) data := graveler.BranchData{} pred, err := kv.GetMsg(context.Background(), m.kvStore, graveler.RepoPartition(repository), []byte(key), &data) @@ -430,7 +425,7 @@ func (m *Manager) GCBranchIterator(ctx context.Context, repository *graveler.Rep func (m *Manager) GetTag(ctx context.Context, repository *graveler.RepositoryRecord, tagID graveler.TagID) (*graveler.CommitID, error) { key := fmt.Sprintf("GetTag:%s:%s", repository.RepositoryID, tagID) - commitID, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { + commitID, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { tagKey := graveler.TagPath(tagID) t := graveler.TagData{} _, err := kv.GetMsg(context.Background(), m.kvStore, graveler.RepoPartition(repository), []byte(tagKey), &t) @@ -481,7 +476,7 @@ func (m *Manager) GetCommitByPrefix(ctx context.Context, repository *graveler.Re return m.GetCommit(ctx, repository, prefix) } key := fmt.Sprintf("GetCommitByPrefix:%s:%s", repository.RepositoryID, prefix) - commit, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { + commit, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { it, err := NewOrderedCommitIterator(context.Background(), m.kvStore, repository, false) if err != nil { return nil, err @@ -527,7 +522,7 @@ func (m *Manager) GetCommit(ctx context.Context, repository *graveler.Repository func (m *Manager) getCommitBatch(ctx context.Context, repository *graveler.RepositoryRecord, commitID graveler.CommitID) (*graveler.Commit, error) { key := fmt.Sprintf("GetCommit:%s:%s", repository.RepositoryID, commitID) - commit, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { + commit, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) { return m.getCommit(context.Background(), commitID, repository) })) if err != nil {