diff --git a/tsdb/config.go b/tsdb/config.go index feb4927783a..0490934f1bb 100644 --- a/tsdb/config.go +++ b/tsdb/config.go @@ -52,6 +52,10 @@ const ( // block in a TSM file DefaultMaxPointsPerBlock = 1000 + // AggressiveMaxPointsPerBlock is used when we want to further compact blocks + // it is 100 times the default amount of points we use per block + AggressiveMaxPointsPerBlock = DefaultMaxPointsPerBlock * 100 + // DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database. // This limit only applies to the "inmem" index. DefaultMaxSeriesPerDatabase = 1000000 @@ -77,8 +81,20 @@ const ( // partition snapshot compactions that can run at one time. // A value of 0 results in runtime.GOMAXPROCS(0). DefaultSeriesFileMaxConcurrentSnapshotCompactions = 0 + + // MaxTSMFileSize is the maximum size of TSM files. + MaxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB ) +var SingleGenerationReasonText string = SingleGenerationReason() + +// SingleGenerationReason outputs a log message for our single generation compaction +// when checked for full compaction. +// 1048576000 is a magic number for bytes per gigabyte. +func SingleGenerationReason() string { + return fmt.Sprintf("not fully compacted and not idle because single generation with more than 2 files under %d GB and more than 1 file(s) under aggressive compaction points per block count (%d points)", int(MaxTSMFileSize/1048576000), AggressiveMaxPointsPerBlock) +} + // Config holds the configuration for the tsbd package. type Config struct { Dir string `toml:"dir"` diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 6da71e136cf..ccd3b1066cd 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -33,7 +33,6 @@ import ( "go.uber.org/zap" ) -const maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB const logEvery = 2 * DefaultSegmentSize const ( @@ -101,7 +100,13 @@ type CompactionGroup []string type CompactionPlanner interface { Plan(lastWrite time.Time) ([]CompactionGroup, int64) PlanLevel(level int) ([]CompactionGroup, int64) - PlanOptimize() ([]CompactionGroup, int64) + // PlanOptimize will return the groups for compaction, the compaction group length, + // and the amount of generations within the compaction group. + // generationCount needs to be set to decide how many points per block during compaction. + // This value is mostly ignored in normal compaction code paths, but, + // for the edge case where there is a single generation with many + // files under 2 GB this value is an important indicator. + PlanOptimize() (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64) Release(group []CompactionGroup) FullyCompacted() (bool, string) @@ -230,6 +235,27 @@ func (c *DefaultPlanner) FullyCompacted() (bool, string) { } else if gens.hasTombstones() { return false, "not fully compacted and not idle because of tombstones" } else { + // For planning we want to ensure that if there is a single generation + // shard, but it has many files that are under 2 GB and many files that are + // not at the aggressive compaction points per block count (100,000) we further + // compact the shard. It is okay to stop compaction if there are many + // files that are under 2 GB but at the aggressive points per block count. + if len(gens) == 1 && len(gens[0].files) > 1 { + aggressivePointsPerBlockCount := 0 + filesUnderMaxTsmSizeCount := 0 + for _, tsmFile := range gens[0].files { + if c.FileStore.BlockCount(tsmFile.Path, 1) >= tsdb.AggressiveMaxPointsPerBlock { + aggressivePointsPerBlockCount++ + } + if tsmFile.Size < tsdb.MaxTSMFileSize { + filesUnderMaxTsmSizeCount++ + } + } + + if filesUnderMaxTsmSizeCount > 1 && aggressivePointsPerBlockCount < len(gens[0].files) { + return false, tsdb.SingleGenerationReasonText + } + } return true, "" } } @@ -340,13 +366,13 @@ func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) { // PlanOptimize returns all TSM files if they are in different generations in order // to optimize the index across TSM files. Each returned compaction group can be // compacted concurrently. -func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { +func (c *DefaultPlanner) PlanOptimize() (compactGroup []CompactionGroup, compactionGroupLen int64, generationCount int64) { // If a full plan has been requested, don't plan any levels which will prevent // the full plan from acquiring them. c.mu.RLock() if c.forceFull { c.mu.RUnlock() - return nil, 0 + return nil, 0, 0 } c.mu.RUnlock() @@ -354,11 +380,10 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { // a generation conceptually as a single file even though it may be // split across several files in sequence. generations := c.findGenerations(true) + fullyCompacted, _ := c.FullyCompacted() - // If there is only one generation and no tombstones, then there's nothing to - // do. - if len(generations) <= 1 && !generations.hasTombstones() { - return nil, 0 + if fullyCompacted { + return nil, 0, 0 } // Group each generation by level such that two adjacent generations in the same @@ -368,11 +393,6 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { for i := 0; i < len(generations); i++ { cur := generations[i] - // Skip the file if it's over the max size and contains a full block and it does not have any tombstones - if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() { - continue - } - // See if this generation is orphan'd which would prevent it from being further // compacted until a final full compactin runs. if i < len(generations)-1 { @@ -382,7 +402,7 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { } } - if len(currentGen) == 0 || currentGen.level() == cur.level() { + if len(currentGen) == 0 || currentGen.level() >= cur.level() { currentGen = append(currentGen, cur) continue } @@ -397,21 +417,21 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { } // Only optimize level 4 files since using lower-levels will collide - // with the level planners + // with the level planners. If this is a single generation optimization + // do not skip any levels. var levelGroups []tsmGenerations - for _, cur := range groups { - if cur.level() == 4 { - levelGroups = append(levelGroups, cur) + if len(generations) == 1 { + levelGroups = append(levelGroups, groups...) + } else { + for _, cur := range groups { + if cur.level() == 4 { + levelGroups = append(levelGroups, cur) + } } } var cGroups []CompactionGroup for _, group := range levelGroups { - // Skip the group if it's not worthwhile to optimize it - if len(group) < 4 && !group.hasTombstones() { - continue - } - var cGroup CompactionGroup for _, gen := range group { for _, file := range gen.files { @@ -423,10 +443,10 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { } if !c.acquire(cGroups) { - return nil, int64(len(cGroups)) + return nil, int64(len(cGroups)), int64(len(generations)) } - return cGroups, int64(len(cGroups)) + return cGroups, int64(len(cGroups)), int64(len(generations)) } // Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns @@ -454,7 +474,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) { var skip bool // Skip the file if it's over the max size and contains a full block and it does not have any tombstones - if len(generations) > 2 && group.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !group.hasTombstones() { + if len(generations) > 2 && group.size() > uint64(tsdb.MaxTSMFileSize) && c.FileStore.BlockCount(group.files[0].Path, 1) >= tsdb.DefaultMaxPointsPerBlock && !group.hasTombstones() { skip = true } @@ -530,7 +550,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) { // Skip the file if it's over the max size and contains a full block or the generation is split // over multiple files. In the latter case, that would mean the data in the file spilled over // the 2GB limit. - if g.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock { + if g.size() > uint64(tsdb.MaxTSMFileSize) && c.FileStore.BlockCount(g.files[0].Path, 1) >= tsdb.DefaultMaxPointsPerBlock { start = i + 1 } @@ -574,7 +594,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) { } // Skip the file if it's over the max size and it contains a full block - if gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() { + if gen.size() >= uint64(tsdb.MaxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) >= tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() { startIndex++ continue } @@ -910,6 +930,10 @@ func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, e // compact writes multiple smaller TSM files into 1 or more larger files. func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) { + // Sets the points per block size. The larger this value is set + // the more points there will be in a single index. Under normal + // conditions this should always be 1000 but there is an edge case + // where this is increased. size := c.Size if size <= 0 { size = tsdb.DefaultMaxPointsPerBlock @@ -1199,7 +1223,7 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * // If we're over maxTSMFileSize, close out the file // and return the error. - if w.Size() > maxTSMFileSize { + if w.Size() > tsdb.MaxTSMFileSize { if err := w.WriteIndex(); err != nil { return false, err } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index ccfa6d4f469..40bc8cbdf59 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -3,6 +3,7 @@ package tsm1_test import ( "errors" "fmt" + "golang.org/x/exp/slices" "io/fs" "math" "os" @@ -15,6 +16,7 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -2258,186 +2260,532 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) { ) expFiles := []tsm1.FileStat{} - tsm, pLen := cp.PlanOptimize() + tsm, pLen, gLen := cp.PlanOptimize() if exp, got := len(expFiles), len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } else if pLen != int64(len(tsm)) { t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) + } else if gLen != int64(3) { + t.Fatalf("generation len plan mismatch: got %v, exp %v", gLen, 3) } } -func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) { - data := []tsm1.FileStat{ - { - Path: "01-04.tsm1", - Size: 251 * 1024 * 1024, - }, - { - Path: "02-04.tsm1", - Size: 1 * 1024 * 1024, - }, - { - Path: "03-04.tsm1", - Size: 1 * 1024 * 1024, - }, - { - Path: "04-04.tsm1", - Size: 1 * 1024 * 1024, - }, - { - Path: "05-03.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - { - Path: "06-04.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - { - Path: "07-03.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - } - - cp := tsm1.NewDefaultPlanner( - &fakeFileStore{ - PathsFn: func() []tsm1.FileStat { - return data +func TestDefaultPlanner_PlanOptimize_Test(t *testing.T) { + type PlanOptimizeTests struct { + name string + fs []tsm1.FileStat + bc []int + expectedFullyCompactedReasonExp string + expectedgenerationCount int64 + } + + furtherCompactedTests := []PlanOptimizeTests{ + // Large multi generation group with files at and under 2GB + { + "Large multi generation group with files at and under 2GB", + []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 1048 * 1024 * 1024, + }, + { + Path: "02-05.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-06.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-07.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-08.tsm1", + Size: 1048 * 1024 * 1024, + }, + { + Path: "03-04.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "03-05.tsm1", + Size: 512 * 1024 * 1024, + }, }, - }, tsdb.DefaultCompactFullWriteColdDuration, - ) - - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5]} - tsm, pLen := cp.PlanOptimize() - if exp, got := 1, len(tsm); exp != got { - t.Fatalf("group length mismatch: got %v, exp %v", got, exp) - } else if pLen != int64(len(tsm)) { - t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) - } - - if exp, got := len(expFiles1), len(tsm[0]); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } - - for i, p := range expFiles1 { - if got, exp := tsm[0][i], p.Path; got != exp { - t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) - } - } -} - -func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) { - data := []tsm1.FileStat{ - { - Path: "01-04.tsm1", - Size: 251 * 1024 * 1024, - }, - { - Path: "02-04.tsm1", - Size: 1 * 1024 * 1024, - }, - { - Path: "03-04.tsm1", - Size: 1 * 1024 * 1024, - }, - { - Path: "04-04.tsm1", - Size: 1 * 1024 * 1024, - }, - { - Path: "05-03.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - { - Path: "06-03.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - { - Path: "07-04.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - { - Path: "08-04.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - { - Path: "09-04.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - { - Path: "10-04.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - } - - cp := tsm1.NewDefaultPlanner( - &fakeFileStore{ - PathsFn: func() []tsm1.FileStat { - return data + []int{}, + "not fully compacted and not idle because of more than one generation", + 3, + }, + // ~650mb group size + { + "Small group size with single generation", + []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 300 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 200 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 100 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 50 * 1024 * 1024, + }, }, - }, tsdb.DefaultCompactFullWriteColdDuration, - ) + []int{}, + tsdb.SingleGenerationReasonText, + 1, + }, + // ~650 MB total group size with generations under 4 + { + "Small group size with single generation and levels under 4", + []tsm1.FileStat{ + { + Path: "01-02.tsm1", + Size: 300 * 1024 * 1024, + }, + { + Path: "01-03.tsm1", + Size: 200 * 1024 * 1024, + }, + { + Path: "01-04.tsm1", + Size: 100 * 1024 * 1024, + }, + }, + []int{}, + tsdb.SingleGenerationReasonText, + 1, + }, + { + "Small group size with single generation all at DefaultMaxPointsPerBlock", + []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 300 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 200 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 100 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 50 * 1024 * 1024, + }, + }, []int{tsdb.DefaultMaxPointsPerBlock, tsdb.DefaultMaxPointsPerBlock, tsdb.DefaultMaxPointsPerBlock, tsdb.DefaultMaxPointsPerBlock}, + tsdb.SingleGenerationReasonText, + 1, + }, + // > 2 GB total group size + // 50% of files are at aggressive max block size + { + "Small group size with single generation 50% at DefaultMaxPointsPerBlock and 50% at AggressiveMaxPointsPerBlock", + []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 700 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 500 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 400 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 300 * 1024 * 1024, + }, + { + Path: "01-09.tsm1", + Size: 200 * 1024 * 1024, + }, + { + Path: "01-10.tsm1", + Size: 100 * 1024 * 1024, + }, + { + Path: "01-11.tsm1", + Size: 50 * 1024 * 1024, + }, + { + Path: "01-12.tsm1", + Size: 25 * 1024 * 1024, + }, + }, + []int{ + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + }, + tsdb.SingleGenerationReasonText, + 1, + }, + { + "Group size over 2GB with single generation", + []tsm1.FileStat{ + { + Path: "01-13.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-14.tsm1", + Size: 650 * 1024 * 1024, + }, + { + Path: "01-15.tsm1", + Size: 450 * 1024 * 1024, + }, + }, []int{ + tsdb.AggressiveMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + }, + tsdb.SingleGenerationReasonText, + 1, + }, + { + // Last files are lower than first files generations + // Mix of less than 4 level files and > level 4 files + "Generations with files under level 4", + []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 1048 * 1024 * 1024, + }, + { + Path: "02-05.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-06.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-07.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-08.tsm1", + Size: 1048 * 1024 * 1024, + }, + { + Path: "03-03.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "03-04.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "03-04.tsm1", + Size: 600 * 1024 * 1024, + }, + { + Path: "03-06.tsm1", + Size: 500 * 1024 * 1024, + }, + }, []int{}, "not fully compacted and not idle because of more than one generation", 3, + }, + { + // This test will mock a 'backfill' condition where we have a single + // shard with many generations. The initial generation should be fully + // compacted, but we have some new generations that are not. We need to ensure + // the optimize planner will pick these up and compact everything together. + "Backfill mock condition", + []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-04.tsm1", + Size: 700 * 1024 * 1024, + }, + { + Path: "02-05.tsm1", + Size: 500 * 1024 * 1024, + }, + { + Path: "02-06.tsm1", + Size: 400 * 1024 * 1024, + }, + { + Path: "03-02.tsm1", + Size: 700 * 1024 * 1024, + }, + { + Path: "03-03.tsm1", + Size: 500 * 1024 * 1024, + }, + { + Path: "03-04.tsm1", + Size: 400 * 1024 * 1024, + }, + { + Path: "04-01.tsm1", + Size: 700 * 1024 * 1024, + }, + { + Path: "04-02.tsm1", + Size: 500 * 1024 * 1024, + }, + { + Path: "03-03.tsm1", + Size: 400 * 1024 * 1024, + }, + }, []int{ + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + + tsdb.DefaultMaxPointsPerBlock, + // Use some magic numbers but these are just small values for block counts + 100, + 10, + }, + "not fully compacted and not idle because of more than one generation", + 4, + }, + } + + expectedNotFullyCompacted := func(cp *tsm1.DefaultPlanner, reasonExp string, generationCountExp int64) { + compacted, reason := cp.FullyCompacted() + require.Equal(t, reason, reasonExp, "fullyCompacted reason") + require.False(t, compacted, "is fully compacted") + + _, cgLen := cp.PlanLevel(1) + require.Zero(t, cgLen, "compaction group length; PlanLevel(1)") + _, cgLen = cp.PlanLevel(2) + require.Zero(t, cgLen, "compaction group length; PlanLevel(2)") + _, cgLen = cp.PlanLevel(3) + require.Zero(t, cgLen, "compaction group length; PlanLevel(3)") + + tsmP, pLenP := cp.Plan(time.Now().Add(-time.Second)) + require.Zero(t, len(tsmP), "compaction group; Plan()") + require.Zero(t, pLenP, "compaction group length; Plan()") + + _, cgLen, genLen := cp.PlanOptimize() + require.Equal(t, int64(1), cgLen, "compaction group length") + require.Equal(t, generationCountExp, genLen, "generation count") + + } + + for _, test := range furtherCompactedTests { + t.Run(test.name, func(t *testing.T) { + ffs := &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return test.fs + }, + } - expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} - expFiles2 := []tsm1.FileStat{data[6], data[7], data[8], data[9]} + if len(test.bc) > 0 { + err := ffs.SetBlockCounts(test.bc) + require.NoError(t, err, "setting block counts") + } - tsm, pLen := cp.PlanOptimize() - if exp, got := 2, len(tsm); exp != got { - t.Fatalf("group length mismatch: got %v, exp %v", got, exp) - } else if pLen != int64(len(tsm)) { - t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) - } + cp := tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration) + expectedNotFullyCompacted(cp, test.expectedFullyCompactedReasonExp, test.expectedgenerationCount) - if exp, got := len(expFiles1), len(tsm[0]); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } + // Reverse test files and re-run tests + slices.Reverse(test.fs) + if len(test.bc) > 0 { + slices.Reverse(test.bc) + err := ffs.SetBlockCounts(test.bc) + require.NoError(t, err, "setting reverse block counts") + } - for i, p := range expFiles1 { - if got, exp := tsm[0][i], p.Path; got != exp { - t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) - } + cp = tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration) + expectedNotFullyCompacted(cp, test.expectedFullyCompactedReasonExp, test.expectedgenerationCount) + }) } - if exp, got := len(expFiles2), len(tsm[1]); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } + areFullyCompactedTests := []PlanOptimizeTests{ + { + // This test is added to account for halting state after + // TestDefaultPlanner_FullyCompacted_SmallSingleGeneration + // will need to ensure that once we have single TSM file under 2 GB we stop + "Single TSM file", + []tsm1.FileStat{ + { + Path: "01-09.tsm1", + Size: 650 * 1024 * 1024, + }, + }, + []int{}, + "", 0, + }, + { + // This test is added to account for a single generation that has a group size + // over 2 GB with 1 file under 2 GB all at max points per block with aggressive compaction. + // It should not compact any further. + "TSM files at AggressiveMaxPointsPerBlock", + []tsm1.FileStat{ + { + Path: "01-13.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-14.tsm1", + Size: 691 * 1024 * 1024, + }, + }, []int{ + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + }, "", 0, + }, + { + // This test is added to account for a single generation that has a group size + // over 2 GB at max points per block with aggressive compaction, and, 1 file + // under 2 GB at default max points per block. + // It should not compact any further. + "TSM files cannot compact further, single file under 2G and at DefaultMaxPointsPerBlock", + []tsm1.FileStat{ + { + Path: "01-13.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-14.tsm1", + Size: 691 * 1024 * 1024, + }, + }, []int{ + tsdb.AggressiveMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + }, + "", + 0, + }, + { + // This test is added to account for a single generation that has a group size + // over 2 GB and multiple files under 2 GB all at max points per block for aggressive compaction. + "Group size over 2 with multiple files under 2GB and at AggressiveMaxPointsPerBlock", + []tsm1.FileStat{ + { + Path: "01-13.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-14.tsm1", + Size: 650 * 1024 * 1024, + }, + { + Path: "01-15.tsm1", + Size: 450 * 1024 * 1024, + }, + }, []int{ + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + tsdb.AggressiveMaxPointsPerBlock, + }, "", 0, + }, + } + + expectedFullyCompacted := func(cp *tsm1.DefaultPlanner, reasonExp string) { + compacted, reason := cp.FullyCompacted() + require.Equal(t, reason, reasonExp, "fullyCompacted reason") + require.True(t, compacted, "is fully compacted") + + _, cgLen := cp.PlanLevel(1) + require.Zero(t, cgLen, "compaction group length; PlanLevel(1)") + _, cgLen = cp.PlanLevel(2) + require.Zero(t, cgLen, "compaction group length; PlanLevel(2)") + _, cgLen = cp.PlanLevel(3) + require.Zero(t, cgLen, "compaction group length; PlanLevel(3)") + + tsmP, pLenP := cp.Plan(time.Now().Add(-time.Second)) + require.Zero(t, len(tsmP), "compaction group; Plan()") + require.Zero(t, pLenP, "compaction group length; Plan()") + + cgroup, cgLen, genLen := cp.PlanOptimize() + require.Equal(t, []tsm1.CompactionGroup(nil), cgroup, "compaction group") + require.Zero(t, cgLen, "compaction group length") + require.Zero(t, genLen, "generation count") + } + + for _, test := range areFullyCompactedTests { + t.Run(test.name, func(t *testing.T) { + ffs := &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return test.fs + }, + } - for i, p := range expFiles2 { - if got, exp := tsm[1][i], p.Path; got != exp { - t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp) - } - } -} + if len(test.bc) > 0 { + err := ffs.SetBlockCounts(test.bc) + require.NoError(t, err, "setting block counts") + } -func TestDefaultPlanner_PlanOptimize_Optimized(t *testing.T) { - data := []tsm1.FileStat{ - { - Path: "01-03.tsm1", - Size: 251 * 1024 * 1024, - }, - { - Path: "01-04.tsm1", - Size: 1 * 1024 * 1024, - }, - { - Path: "01-05.tsm1", - Size: 2 * 1024 * 1024 * 1024, - }, - } + cp := tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration) + expectedFullyCompacted(cp, test.expectedFullyCompactedReasonExp) - cp := tsm1.NewDefaultPlanner( - &fakeFileStore{ - PathsFn: func() []tsm1.FileStat { - return data - }, - }, tsdb.DefaultCompactFullWriteColdDuration, - ) + // Reverse test files and re-run tests + slices.Reverse(test.fs) + if len(test.bc) > 0 { + slices.Reverse(test.bc) + err := ffs.SetBlockCounts(test.bc) + require.NoError(t, err, "setting reverse block counts") + } - expFiles := []tsm1.FileStat{} - tsm, pLen := cp.PlanOptimize() - if exp, got := len(expFiles), len(tsm); got != exp { - t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) - } else if pLen != int64(len(tsm)) { - t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) + cp = tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration) + expectedFullyCompacted(cp, test.expectedFullyCompactedReasonExp) + }) } } @@ -2467,7 +2815,7 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) { ) expFiles := []tsm1.FileStat{data[0], data[1], data[2]} - tsm, pLen := cp.PlanOptimize() + tsm, pLen, _ := cp.PlanOptimize() if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } else if pLen != int64(len(tsm)) { @@ -2591,10 +2939,13 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { PathsFn: func() []tsm1.FileStat { return testSet }, - blockCount: 1000, } + err := ffs.SetBlockCounts([]int{tsdb.DefaultMaxPointsPerBlock, tsdb.DefaultMaxPointsPerBlock, tsdb.DefaultMaxPointsPerBlock, tsdb.DefaultMaxPointsPerBlock}) + require.NoError(t, err, "SetBlockCounts") + cp := tsm1.NewDefaultPlanner(ffs, time.Nanosecond) + plan, pLen := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files if exp, got := 4, len(plan[0]); got != exp { @@ -2632,9 +2983,17 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { PathsFn: func() []tsm1.FileStat { return over }, - blockCount: 1000, } + err = overFs.SetBlockCounts([]int{ + tsdb.DefaultMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + tsdb.DefaultMaxPointsPerBlock, + }) + require.NoError(t, err, "SetBlockCounts") + cp.FileStore = overFs plan, pLen = cp.Plan(time.Now().Add(-time.Second)) if exp, got := 0, len(plan); got != exp { @@ -2644,7 +3003,7 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { } cp.Release(plan) - plan, pLen = cp.PlanOptimize() + plan, pLen, _ = cp.PlanOptimize() // ensure the optimize planner would pick this up if exp, got := 1, len(plan); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) @@ -2712,14 +3071,21 @@ func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) { }, } - cp := tsm1.NewDefaultPlanner( - &fakeFileStore{ - blockCount: 1000, - PathsFn: func() []tsm1.FileStat { - return data - }, + fs := &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data }, - time.Hour) + } + + var bcs []int + for range data { + bcs = append(bcs, tsdb.DefaultMaxPointsPerBlock) + } + + err := fs.SetBlockCounts(bcs) + require.NoError(t, err, "SetBlockCounts") + + cp := tsm1.NewDefaultPlanner(fs, time.Hour) tsm, pLen := cp.Plan(time.Now().Add(-24 * time.Hour)) if exp, got := 1, len(tsm); got != exp { @@ -2755,9 +3121,16 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { PathsFn: func() []tsm1.FileStat { return testSet }, - blockCount: 100, } + var bcs []int + for range testSet { + bcs = append(bcs, 100) + } + + err := ffs.SetBlockCounts(bcs) + require.NoError(t, err, "SetBlockCounts") + cp := tsm1.NewDefaultPlanner( ffs, time.Nanosecond, @@ -2788,9 +3161,16 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { PathsFn: func() []tsm1.FileStat { return over }, - blockCount: 100, } + bcs = make([]int, 0) + for range over { + bcs = append(bcs, 100) + } + + err = overFs.SetBlockCounts(bcs) + require.NoError(t, err, "SetBlockCounts") + cp.FileStore = overFs cGroups, pLen := cp.Plan(time.Now().Add(-time.Second)) if exp, got := 1, len(cGroups); got != exp { @@ -3099,8 +3479,10 @@ func MustOpenTSMReader(name string) *tsm1.TSMReader { type fakeFileStore struct { PathsFn func() []tsm1.FileStat lastModified time.Time - blockCount int - readers []*tsm1.TSMReader + // fakeFileStore blockCount holds a map of file paths from + // PathsFn.FileStat to a mock block count as an integer. + blockCount map[string]int + readers []*tsm1.TSMReader } func (w *fakeFileStore) Stats() []tsm1.FileStat { @@ -3115,8 +3497,30 @@ func (w *fakeFileStore) LastModified() time.Time { return w.lastModified } +// Utility function to set mock block counts on a TSM file basis +// If the number of inputs supplied is less then the amount of TSM +// files in a given PathsFn it will error. +func (w *fakeFileStore) SetBlockCounts(inputs []int) error { + if len(inputs) != len(w.PathsFn()) { + return errors.New("inputs []int length does not equal length of PathsFn()") + } + + bc := make(map[string]int) + for i, f := range w.PathsFn() { + bc[f.Path] = inputs[i] + } + + w.blockCount = bc + return nil +} + func (w *fakeFileStore) BlockCount(path string, idx int) int { - return w.blockCount + for _, f := range w.PathsFn() { + if f.Path == path { + return w.blockCount[path] + } + } + return 0 } func (w *fakeFileStore) TSMReader(path string) (*tsm1.TSMReader, error) { diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index a27e41f8254..7c50be10894 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2130,8 +2130,9 @@ func (e *Engine) compact(wg *sync.WaitGroup) { atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, len4) // If no full compactions are need, see if an optimize is needed + var genLen int64 if len(level4Groups) == 0 { - level4Groups, len4 = e.CompactionPlan.PlanOptimize() + level4Groups, len4, genLen = e.CompactionPlan.PlanOptimize() atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, len4) } @@ -2166,6 +2167,17 @@ func (e *Engine) compact(wg *sync.WaitGroup) { level3Groups = level3Groups[1:] } case 4: + // This is a heuristic. 100_000 points per block is suitable for when we have a + // single generation with multiple files at max block size under 2 GB. + if genLen == 1 { + // Log TSM files that will have an increased points per block count. + for _, f := range level4Groups[0] { + e.logger.Info("TSM optimized compaction on single generation running, increasing total points per block to 100_000.", zap.String("path", f)) + } + e.Compactor.Size = tsdb.AggressiveMaxPointsPerBlock + } else { + e.Compactor.Size = tsdb.DefaultMaxPointsPerBlock + } if e.compactFull(level4Groups[0], wg) { level4Groups = level4Groups[1:] } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 947f4f65842..ae7cc19a330 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -2889,7 +2889,7 @@ type mockPlanner struct{} func (m *mockPlanner) Plan(lastWrite time.Time) ([]tsm1.CompactionGroup, int64) { return nil, 0 } func (m *mockPlanner) PlanLevel(level int) ([]tsm1.CompactionGroup, int64) { return nil, 0 } -func (m *mockPlanner) PlanOptimize() ([]tsm1.CompactionGroup, int64) { return nil, 0 } +func (m *mockPlanner) PlanOptimize() ([]tsm1.CompactionGroup, int64, int64) { return nil, 0, 0 } func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} func (m *mockPlanner) FullyCompacted() (bool, string) { return false, "not compacted" } func (m *mockPlanner) ForceFull() {}