Skip to content

Commit

Permalink
Make MaxBatchDelay configurable (#7774)
Browse files Browse the repository at this point in the history
* Make MaxBatchDelay configurable

* fix
  • Loading branch information
itaiad200 authored May 18, 2024
1 parent 61b6211 commit dc98f49
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 14 deletions.
3 changes: 2 additions & 1 deletion docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ This reference uses `.` to denote the nesting of values.
* `graveler.commit_cache.size` `(int : 50000)` - How many items to store in the commit cache.
* `graveler.commit_cache.ttl` `(time duration : "10m")` - How long to store an item in the commit cache.
* `graveler.commit_cache.jitter` `(time duration : "2s")` - A random amount of time between 0 and this value is added to each item's TTL.
* `graveler.background.rate_limit` `(int : 0)` - Advence configuration to control background work done rate limit in requests per second (default: 0 - unlimited).
* `graveler.max_batch_delay` `(duration : 3ms)` - Controls the server batching period for references store operations.
* `graveler.background.rate_limit` `(int : 0)` - Requests per seconds limit on background work performed (default: 0 - unlimited), like deleting committed staging tokens.
* `committed.local_cache` - an object describing the local (on-disk) cache of metadata from
permanent storage:
+ `committed.local_cache.size_bytes` (`int` : `1073741824`) - bytes for local cache to use on disk. The cache may use more storage for short periods of time.
Expand Down
1 change: 1 addition & 0 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
AddressProvider: addressProvider,
RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache),
CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache),
MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay,
})
gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, cfg.Config.Committed.BlockStoragePrefix)
settingManager := settings.NewManager(refManager, cfg.KVStore)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ type Config struct {
Background struct {
RateLimit int `mapstructure:"rate_limit"`
} `mapstructure:"background"`
MaxBatchDelay time.Duration `mapstructure:"max_batch_delay"`
} `mapstructure:"graveler"`
Gateways struct {
S3 struct {
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ func setDefaults(cfgType string) {
viper.SetDefault("graveler.commit_cache.expiry", 10*time.Minute)
viper.SetDefault("graveler.commit_cache.jitter", 2*time.Second)

// MaxBatchDelay - 3ms was chosen as a max delay time for critical path queries.
// It trades off amount of queries per second (and thus effectiveness of the batching mechanism) with added latency.
// Since reducing # of expensive operations is only beneficial when there are a lot of concurrent requests,
//
// the sweet spot is probably between 1-5 milliseconds (representing 200-1000 requests/second to the data store).
//
// 3ms of delay with ~300 requests/second per resource sounds like a reasonable tradeoff.
viper.SetDefault("graveler.max_batch_delay", 3*time.Millisecond)

viper.SetDefault("ugc.prepare_interval", time.Minute)
viper.SetDefault("ugc.prepare_max_file_size", 20*1024*1024)

Expand Down
21 changes: 8 additions & 13 deletions pkg/graveler/ref/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ import (
)

const (
// MaxBatchDelay - 3ms was chosen as a max delay time for critical path queries.
// It trades off amount of queries per second (and thus effectiveness of the batching mechanism) with added latency.
// Since reducing # of expensive operations is only beneficial when there are a lot of concurrent requests,
//
// the sweet spot is probably between 1-5 milliseconds (representing 200-1000 requests/second to the data store).
//
// 3ms of delay with ~300 requests/second per resource sounds like a reasonable tradeoff.
MaxBatchDelay = 3 * time.Millisecond
// commitIDStringLength string representation length of commit ID - based on hex representation of sha256
commitIDStringLength = 64
// ImportExpiryTime Expiry time to remove imports from ref-store
Expand All @@ -44,6 +36,7 @@ type Manager struct {
batchExecutor batch.Batcher
repoCache cache.Cache
commitCache cache.Cache
maxBatchDelay time.Duration
}

func branchFromProto(pb *graveler.BranchData) *graveler.Branch {
Expand Down Expand Up @@ -80,6 +73,7 @@ type ManagerConfig struct {
AddressProvider ident.AddressProvider
RepositoryCacheConfig CacheConfig
CommitCacheConfig CacheConfig
MaxBatchDelay time.Duration
}

func NewRefManager(cfg ManagerConfig) *Manager {
Expand All @@ -90,6 +84,7 @@ func NewRefManager(cfg ManagerConfig) *Manager {
batchExecutor: cfg.Executor,
repoCache: newCache(cfg.RepositoryCacheConfig),
commitCache: newCache(cfg.CommitCacheConfig),
maxBatchDelay: cfg.MaxBatchDelay,
}
}

Expand All @@ -107,7 +102,7 @@ func (m *Manager) getRepository(ctx context.Context, repositoryID graveler.Repos

func (m *Manager) getRepositoryBatch(ctx context.Context, repositoryID graveler.RepositoryID) (*graveler.RepositoryRecord, error) {
key := fmt.Sprintf("GetRepository:%s", repositoryID)
repository, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
repository, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
return m.getRepository(context.Background(), repositoryID)
}))
if err != nil {
Expand Down Expand Up @@ -359,7 +354,7 @@ func (m *Manager) getBranchWithPredicate(ctx context.Context, repository *gravel
*graveler.Branch
kv.Predicate
}
result, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
result, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
key := graveler.BranchPath(branchID)
data := graveler.BranchData{}
pred, err := kv.GetMsg(context.Background(), m.kvStore, graveler.RepoPartition(repository), []byte(key), &data)
Expand Down Expand Up @@ -430,7 +425,7 @@ func (m *Manager) GCBranchIterator(ctx context.Context, repository *graveler.Rep

func (m *Manager) GetTag(ctx context.Context, repository *graveler.RepositoryRecord, tagID graveler.TagID) (*graveler.CommitID, error) {
key := fmt.Sprintf("GetTag:%s:%s", repository.RepositoryID, tagID)
commitID, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
commitID, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
tagKey := graveler.TagPath(tagID)
t := graveler.TagData{}
_, err := kv.GetMsg(context.Background(), m.kvStore, graveler.RepoPartition(repository), []byte(tagKey), &t)
Expand Down Expand Up @@ -481,7 +476,7 @@ func (m *Manager) GetCommitByPrefix(ctx context.Context, repository *graveler.Re
return m.GetCommit(ctx, repository, prefix)
}
key := fmt.Sprintf("GetCommitByPrefix:%s:%s", repository.RepositoryID, prefix)
commit, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
commit, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
it, err := NewOrderedCommitIterator(context.Background(), m.kvStore, repository, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -527,7 +522,7 @@ func (m *Manager) GetCommit(ctx context.Context, repository *graveler.Repository

func (m *Manager) getCommitBatch(ctx context.Context, repository *graveler.RepositoryRecord, commitID graveler.CommitID) (*graveler.Commit, error) {
key := fmt.Sprintf("GetCommit:%s:%s", repository.RepositoryID, commitID)
commit, err := m.batchExecutor.BatchFor(ctx, key, MaxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
commit, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
return m.getCommit(context.Background(), commitID, repository)
}))
if err != nil {
Expand Down

0 comments on commit dc98f49

Please sign in to comment.