From 21db7413dbc87d6de14a98e96759f12d28e687d7 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 6 Feb 2025 17:24:49 +0000 Subject: [PATCH 1/6] Add comment --- x-pack/apm-server/sampling/eventstorage/pebble.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/apm-server/sampling/eventstorage/pebble.go b/x-pack/apm-server/sampling/eventstorage/pebble.go index 319836c622d..73158d11758 100644 --- a/x-pack/apm-server/sampling/eventstorage/pebble.go +++ b/x-pack/apm-server/sampling/eventstorage/pebble.go @@ -34,11 +34,12 @@ func eventComparer() *pebble.Comparer { } return comparer.ComparePointSuffixes(a[ap:], b[bp:]) } - comparer.Name = "apmserver.EventComparer" + comparer.Name = "apmserver.EventComparer" // this should stay constant, otherwise existing database won't open return &comparer } func OpenEventPebble(storageDir string) (*pebble.DB, error) { + // Option values are picked and validated in https://github.com/elastic/apm-server/issues/15568 opts := &pebble.Options{ FormatMajorVersion: pebble.FormatColumnarBlocks, Logger: logp.NewLogger(logs.Sampling), @@ -58,6 +59,7 @@ func OpenEventPebble(storageDir string) (*pebble.DB, error) { } func OpenDecisionPebble(storageDir string) (*pebble.DB, error) { + // Option values are picked and validated in https://github.com/elastic/apm-server/issues/15568 return pebble.Open(filepath.Join(storageDir, "decision"), &pebble.Options{ FormatMajorVersion: pebble.FormatColumnarBlocks, Logger: logp.NewLogger(logs.Sampling), From ba1fdd5707a3853bf0bc9f122ac1078329ccd722 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 6 Feb 2025 17:26:42 +0000 Subject: [PATCH 2/6] Increase event block size, explain --- x-pack/apm-server/sampling/eventstorage/pebble.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/apm-server/sampling/eventstorage/pebble.go b/x-pack/apm-server/sampling/eventstorage/pebble.go index 73158d11758..cc442267b7f 100644 --- a/x-pack/apm-server/sampling/eventstorage/pebble.go +++ b/x-pack/apm-server/sampling/eventstorage/pebble.go @@ -46,7 +46,7 @@ func OpenEventPebble(storageDir string) (*pebble.DB, error) { MemTableSize: 16 << 20, Levels: []pebble.LevelOptions{ { - BlockSize: 16 << 10, + BlockSize: 32 << 10, // the bigger the blocks, the better the compression and the smaller the index block Compression: func() pebble.Compression { return pebble.SnappyCompression }, FilterPolicy: bloom.FilterPolicy(10), FilterType: pebble.TableFilter, @@ -63,7 +63,7 @@ func OpenDecisionPebble(storageDir string) (*pebble.DB, error) { return pebble.Open(filepath.Join(storageDir, "decision"), &pebble.Options{ FormatMajorVersion: pebble.FormatColumnarBlocks, Logger: logp.NewLogger(logs.Sampling), - MemTableSize: 2 << 20, + MemTableSize: 2 << 20, // big memtables are slow to scan, and significantly slow the hot path Levels: []pebble.LevelOptions{ { BlockSize: 2 << 10, From 71e0ed791f3563c206e9a561bbc90aaea90e5b64 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 6 Feb 2025 17:29:55 +0000 Subject: [PATCH 3/6] Remove useless option --- x-pack/apm-server/sampling/eventstorage/pebble.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/apm-server/sampling/eventstorage/pebble.go b/x-pack/apm-server/sampling/eventstorage/pebble.go index cc442267b7f..5dedcd01895 100644 --- a/x-pack/apm-server/sampling/eventstorage/pebble.go +++ b/x-pack/apm-server/sampling/eventstorage/pebble.go @@ -54,7 +54,6 @@ func OpenEventPebble(storageDir string) (*pebble.DB, error) { }, Comparer: eventComparer(), } - opts.Experimental.MaxWriterConcurrency = 1 // >0 enables parallel writers, the actual value doesn't matter return pebble.Open(filepath.Join(storageDir, "event"), opts) } From 4ae0eec9a5434554ecc41ec7e58f1e33339fa362 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 6 Feb 2025 18:01:29 +0000 Subject: [PATCH 4/6] Configurable database cache size --- internal/beater/beater.go | 8 ++++++++ internal/beater/config/sampling.go | 3 +++ x-pack/apm-server/main.go | 8 +++++--- .../apm-server/sampling/eventstorage/pebble.go | 10 ++++++++-- .../sampling/eventstorage/storage_manager.go | 17 +++++++++++++---- 5 files changed, 37 insertions(+), 9 deletions(-) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 802f132e884..ecdd216d76b 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -230,6 +230,14 @@ func (s *Runner) Run(ctx context.Context) error { ) } + if s.config.Sampling.Tail.Enabled { + // 16MB for 1GB + s.config.Sampling.Tail.DatabaseCacheSize = uint64(linearScaledValue(8<<20, memLimitGB, 8<<20)) + s.logger.Infof("Sampling.Tail.DatabaseCacheSize set to %d based on %0.1fgb of memory", + s.config.Sampling.Tail.DatabaseCacheSize, memLimitGB, + ) + } + // Send config to telemetry. recordAPMServerConfig(s.config) diff --git a/internal/beater/config/sampling.go b/internal/beater/config/sampling.go index 7986068f251..69e2870e6db 100644 --- a/internal/beater/config/sampling.go +++ b/internal/beater/config/sampling.go @@ -63,6 +63,9 @@ type TailSamplingConfig struct { DiscardOnWriteFailure bool `config:"discard_on_write_failure"` + // DatabaseCacheSize is cache size in bytes for tail-sampling database. + DatabaseCacheSize uint64 `config:"database_cache_size"` + esConfigured bool } diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index 563461c18f9..37f755067f7 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -108,7 +108,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er } storageDir := paths.Resolve(paths.Data, tailSamplingStorageDir) - db, err := getDB(storageDir, args.MeterProvider) + db, err := getDB(storageDir, tailSamplingConfig.DatabaseCacheSize, args.MeterProvider) if err != nil { return nil, fmt.Errorf("failed to get tail-sampling database: %w", err) } @@ -154,11 +154,13 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er }) } -func getDB(storageDir string, mp metric.MeterProvider) (*eventstorage.StorageManager, error) { +func getDB(storageDir string, cacheSize uint64, mp metric.MeterProvider) (*eventstorage.StorageManager, error) { dbMu.Lock() defer dbMu.Unlock() if db == nil { - var opts []eventstorage.StorageManagerOptions + opts := []eventstorage.StorageManagerOptions{ + eventstorage.WithDBCacheSize(cacheSize), + } if mp != nil { opts = append(opts, eventstorage.WithMeterProvider(mp)) } diff --git a/x-pack/apm-server/sampling/eventstorage/pebble.go b/x-pack/apm-server/sampling/eventstorage/pebble.go index 5dedcd01895..0919a2eca62 100644 --- a/x-pack/apm-server/sampling/eventstorage/pebble.go +++ b/x-pack/apm-server/sampling/eventstorage/pebble.go @@ -38,8 +38,10 @@ func eventComparer() *pebble.Comparer { return &comparer } -func OpenEventPebble(storageDir string) (*pebble.DB, error) { +func OpenEventPebble(storageDir string, cacheSize uint64) (*pebble.DB, error) { // Option values are picked and validated in https://github.com/elastic/apm-server/issues/15568 + cache := pebble.NewCache(int64(cacheSize)) + defer cache.Unref() opts := &pebble.Options{ FormatMajorVersion: pebble.FormatColumnarBlocks, Logger: logp.NewLogger(logs.Sampling), @@ -53,12 +55,15 @@ func OpenEventPebble(storageDir string) (*pebble.DB, error) { }, }, Comparer: eventComparer(), + Cache: cache, } return pebble.Open(filepath.Join(storageDir, "event"), opts) } -func OpenDecisionPebble(storageDir string) (*pebble.DB, error) { +func OpenDecisionPebble(storageDir string, cacheSize uint64) (*pebble.DB, error) { // Option values are picked and validated in https://github.com/elastic/apm-server/issues/15568 + cache := pebble.NewCache(int64(cacheSize)) + defer cache.Unref() return pebble.Open(filepath.Join(storageDir, "decision"), &pebble.Options{ FormatMajorVersion: pebble.FormatColumnarBlocks, Logger: logp.NewLogger(logs.Sampling), @@ -71,5 +76,6 @@ func OpenDecisionPebble(storageDir string) (*pebble.DB, error) { FilterType: pebble.TableFilter, }, }, + Cache: cache, }) } diff --git a/x-pack/apm-server/sampling/eventstorage/storage_manager.go b/x-pack/apm-server/sampling/eventstorage/storage_manager.go index c1d7f221302..cade53545a4 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_manager.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_manager.go @@ -85,6 +85,13 @@ func WithGetDiskUsage(getDiskUsage func() (DiskUsage, error)) StorageManagerOpti } } +// WithDBCacheSize sets the total size in bytes of in-memory cache of all databases managed by StorageManager. +func WithDBCacheSize(size uint64) StorageManagerOptions { + return func(sm *StorageManager) { + sm.dbCacheSize = size + } +} + // DiskUsage is the struct returned by getDiskUsage. type DiskUsage struct { UsedBytes, TotalBytes uint64 @@ -93,8 +100,9 @@ type DiskUsage struct { // StorageManager encapsulates pebble.DB. // It assumes exclusive access to pebble DB at storageDir. type StorageManager struct { - storageDir string - logger *logp.Logger + storageDir string + dbCacheSize uint64 + logger *logp.Logger eventDB *pebble.DB decisionDB *pebble.DB @@ -146,6 +154,7 @@ func NewStorageManager(storageDir string, opts ...StorageManagerOptions) (*Stora TotalBytes: usage.TotalBytes, }, err }, + dbCacheSize: 16 << 20, // default to 16MB cache shared between event and decision DB } sm.getDBSize = func() uint64 { return sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage() @@ -176,13 +185,13 @@ func NewStorageManager(storageDir string, opts ...StorageManagerOptions) (*Stora // reset initializes db and storage. func (sm *StorageManager) reset() error { - eventDB, err := OpenEventPebble(sm.storageDir) + eventDB, err := OpenEventPebble(sm.storageDir, sm.dbCacheSize/2) if err != nil { return fmt.Errorf("open event db error: %w", err) } sm.eventDB = eventDB - decisionDB, err := OpenDecisionPebble(sm.storageDir) + decisionDB, err := OpenDecisionPebble(sm.storageDir, sm.dbCacheSize/2) if err != nil { return fmt.Errorf("open decision db error: %w", err) } From dffd8da77607e388d7f3e38afaa0971e86c4a43d Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 6 Feb 2025 18:20:34 +0000 Subject: [PATCH 5/6] Fix compile error --- x-pack/apm-server/sampling/eventstorage/prefix_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/apm-server/sampling/eventstorage/prefix_test.go b/x-pack/apm-server/sampling/eventstorage/prefix_test.go index b54696e6a3e..54d353b85ae 100644 --- a/x-pack/apm-server/sampling/eventstorage/prefix_test.go +++ b/x-pack/apm-server/sampling/eventstorage/prefix_test.go @@ -18,7 +18,7 @@ import ( ) func newEventPebble(t *testing.T) *pebble.DB { - db, err := eventstorage.OpenEventPebble(t.TempDir()) + db, err := eventstorage.OpenEventPebble(t.TempDir(), 8<<20) require.NoError(t, err) t.Cleanup(func() { db.Close() @@ -27,7 +27,7 @@ func newEventPebble(t *testing.T) *pebble.DB { } func newDecisionPebble(t *testing.T) *pebble.DB { - db, err := eventstorage.OpenDecisionPebble(t.TempDir()) + db, err := eventstorage.OpenDecisionPebble(t.TempDir(), 8<<20) require.NoError(t, err) t.Cleanup(func() { db.Close() From 8e0f0152062a5b6b76a36d6c94044b6cdefc0082 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 10 Feb 2025 10:07:27 +0000 Subject: [PATCH 6/6] Temporarily revert values for bench --- x-pack/apm-server/sampling/eventstorage/pebble.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/apm-server/sampling/eventstorage/pebble.go b/x-pack/apm-server/sampling/eventstorage/pebble.go index 0919a2eca62..e1d24b536ac 100644 --- a/x-pack/apm-server/sampling/eventstorage/pebble.go +++ b/x-pack/apm-server/sampling/eventstorage/pebble.go @@ -48,7 +48,7 @@ func OpenEventPebble(storageDir string, cacheSize uint64) (*pebble.DB, error) { MemTableSize: 16 << 20, Levels: []pebble.LevelOptions{ { - BlockSize: 32 << 10, // the bigger the blocks, the better the compression and the smaller the index block + BlockSize: 16 << 10, // the bigger the blocks, the better the compression and the smaller the index block Compression: func() pebble.Compression { return pebble.SnappyCompression }, FilterPolicy: bloom.FilterPolicy(10), FilterType: pebble.TableFilter, @@ -57,6 +57,7 @@ func OpenEventPebble(storageDir string, cacheSize uint64) (*pebble.DB, error) { Comparer: eventComparer(), Cache: cache, } + opts.Experimental.MaxWriterConcurrency = 1 return pebble.Open(filepath.Join(storageDir, "event"), opts) }