Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TBS: update pebble options #15579

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ func (s *Runner) Run(ctx context.Context) error {
)
}

if s.config.Sampling.Tail.Enabled {
// 16MB for 1GB
s.config.Sampling.Tail.DatabaseCacheSize = uint64(linearScaledValue(8<<20, memLimitGB, 8<<20))
s.logger.Infof("Sampling.Tail.DatabaseCacheSize set to %d based on %0.1fgb of memory",
s.config.Sampling.Tail.DatabaseCacheSize, memLimitGB,
)
}

// Send config to telemetry.
recordAPMServerConfig(s.config)

Expand Down
3 changes: 3 additions & 0 deletions internal/beater/config/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type TailSamplingConfig struct {

DiscardOnWriteFailure bool `config:"discard_on_write_failure"`

// DatabaseCacheSize is cache size in bytes for tail-sampling database.
DatabaseCacheSize uint64 `config:"database_cache_size"`

esConfigured bool
}

Expand Down
8 changes: 5 additions & 3 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
}

storageDir := paths.Resolve(paths.Data, tailSamplingStorageDir)
db, err := getDB(storageDir, args.MeterProvider)
db, err := getDB(storageDir, tailSamplingConfig.DatabaseCacheSize, args.MeterProvider)
if err != nil {
return nil, fmt.Errorf("failed to get tail-sampling database: %w", err)
}
Expand Down Expand Up @@ -154,11 +154,13 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
})
}

func getDB(storageDir string, mp metric.MeterProvider) (*eventstorage.StorageManager, error) {
func getDB(storageDir string, cacheSize uint64, mp metric.MeterProvider) (*eventstorage.StorageManager, error) {
dbMu.Lock()
defer dbMu.Unlock()
if db == nil {
var opts []eventstorage.StorageManagerOptions
opts := []eventstorage.StorageManagerOptions{
eventstorage.WithDBCacheSize(cacheSize),
}
if mp != nil {
opts = append(opts, eventstorage.WithMeterProvider(mp))
}
Expand Down
20 changes: 14 additions & 6 deletions x-pack/apm-server/sampling/eventstorage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,41 @@ func eventComparer() *pebble.Comparer {
}
return comparer.ComparePointSuffixes(a[ap:], b[bp:])
}
comparer.Name = "apmserver.EventComparer"
comparer.Name = "apmserver.EventComparer" // this should stay constant, otherwise existing database won't open
return &comparer
}

func OpenEventPebble(storageDir string) (*pebble.DB, error) {
func OpenEventPebble(storageDir string, cacheSize uint64) (*pebble.DB, error) {
// Option values are picked and validated in https://github.com/elastic/apm-server/issues/15568
cache := pebble.NewCache(int64(cacheSize))
defer cache.Unref()
opts := &pebble.Options{
FormatMajorVersion: pebble.FormatColumnarBlocks,
Logger: logp.NewLogger(logs.Sampling),
MemTableSize: 16 << 20,
Levels: []pebble.LevelOptions{
{
BlockSize: 16 << 10,
BlockSize: 16 << 10, // the bigger the blocks, the better the compression and the smaller the index block
Compression: func() pebble.Compression { return pebble.SnappyCompression },
FilterPolicy: bloom.FilterPolicy(10),
FilterType: pebble.TableFilter,
},
},
Comparer: eventComparer(),
Cache: cache,
}
opts.Experimental.MaxWriterConcurrency = 1 // >0 enables parallel writers, the actual value doesn't matter
opts.Experimental.MaxWriterConcurrency = 1
return pebble.Open(filepath.Join(storageDir, "event"), opts)
}

func OpenDecisionPebble(storageDir string) (*pebble.DB, error) {
func OpenDecisionPebble(storageDir string, cacheSize uint64) (*pebble.DB, error) {
// Option values are picked and validated in https://github.com/elastic/apm-server/issues/15568
cache := pebble.NewCache(int64(cacheSize))
defer cache.Unref()
return pebble.Open(filepath.Join(storageDir, "decision"), &pebble.Options{
FormatMajorVersion: pebble.FormatColumnarBlocks,
Logger: logp.NewLogger(logs.Sampling),
MemTableSize: 2 << 20,
MemTableSize: 2 << 20, // big memtables are slow to scan, and significantly slow the hot path
Levels: []pebble.LevelOptions{
{
BlockSize: 2 << 10,
Expand All @@ -70,5 +77,6 @@ func OpenDecisionPebble(storageDir string) (*pebble.DB, error) {
FilterType: pebble.TableFilter,
},
},
Cache: cache,
})
}
4 changes: 2 additions & 2 deletions x-pack/apm-server/sampling/eventstorage/prefix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func newEventPebble(t *testing.T) *pebble.DB {
db, err := eventstorage.OpenEventPebble(t.TempDir())
db, err := eventstorage.OpenEventPebble(t.TempDir(), 8<<20)
require.NoError(t, err)
t.Cleanup(func() {
db.Close()
Expand All @@ -27,7 +27,7 @@ func newEventPebble(t *testing.T) *pebble.DB {
}

func newDecisionPebble(t *testing.T) *pebble.DB {
db, err := eventstorage.OpenDecisionPebble(t.TempDir())
db, err := eventstorage.OpenDecisionPebble(t.TempDir(), 8<<20)
require.NoError(t, err)
t.Cleanup(func() {
db.Close()
Expand Down
17 changes: 13 additions & 4 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ func WithGetDiskUsage(getDiskUsage func() (DiskUsage, error)) StorageManagerOpti
}
}

// WithDBCacheSize sets the total size in bytes of in-memory cache of all databases managed by StorageManager.
func WithDBCacheSize(size uint64) StorageManagerOptions {
return func(sm *StorageManager) {
sm.dbCacheSize = size
}
}

// DiskUsage is the struct returned by getDiskUsage.
type DiskUsage struct {
UsedBytes, TotalBytes uint64
Expand All @@ -93,8 +100,9 @@ type DiskUsage struct {
// StorageManager encapsulates pebble.DB.
// It assumes exclusive access to pebble DB at storageDir.
type StorageManager struct {
storageDir string
logger *logp.Logger
storageDir string
dbCacheSize uint64
logger *logp.Logger

eventDB *pebble.DB
decisionDB *pebble.DB
Expand Down Expand Up @@ -146,6 +154,7 @@ func NewStorageManager(storageDir string, opts ...StorageManagerOptions) (*Stora
TotalBytes: usage.TotalBytes,
}, err
},
dbCacheSize: 16 << 20, // default to 16MB cache shared between event and decision DB
}
sm.getDBSize = func() uint64 {
return sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage()
Expand Down Expand Up @@ -176,13 +185,13 @@ func NewStorageManager(storageDir string, opts ...StorageManagerOptions) (*Stora

// reset initializes db and storage.
func (sm *StorageManager) reset() error {
eventDB, err := OpenEventPebble(sm.storageDir)
eventDB, err := OpenEventPebble(sm.storageDir, sm.dbCacheSize/2)
if err != nil {
return fmt.Errorf("open event db error: %w", err)
}
sm.eventDB = eventDB

decisionDB, err := OpenDecisionPebble(sm.storageDir)
decisionDB, err := OpenDecisionPebble(sm.storageDir, sm.dbCacheSize/2)
if err != nil {
return fmt.Errorf("open decision db error: %w", err)
}
Expand Down
Loading