diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index c4be1156766..63eee35f693 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -96,7 +96,7 @@ type CompactionGroup []string type CompactionPlanner interface { Plan(lastWrite time.Time) ([]CompactionGroup, int64) PlanLevel(level int) ([]CompactionGroup, int64) - PlanOptimize() ([]CompactionGroup, int64) + PlanOptimize() ([]CompactionGroup, int64, int64) Release(group []CompactionGroup) FullyCompacted() (bool, string) @@ -225,6 +225,10 @@ func (c *DefaultPlanner) FullyCompacted() (bool, string) { } else if gens.hasTombstones() { return false, "not fully compacted and not idle because of tombstones" } else { + // Safety: check for first index so we don't accidentally do out of bounds access + if len(gens) == 1 && len(gens[0].files) > 1 && gens[0].size() < uint64(maxTSMFileSize) { + return false, "not fully compacted and not idle because group size under 2 GB and more then single file" + } return true, "" } } @@ -335,13 +339,13 @@ func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) { // PlanOptimize returns all TSM files if they are in different generations in order // to optimize the index across TSM files. Each returned compaction group can be // compacted concurrently. -func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { +func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64, int64) { // If a full plan has been requested, don't plan any levels which will prevent // the full plan from acquiring them. c.mu.RLock() if c.forceFull { c.mu.RUnlock() - return nil, 0 + return nil, 0, 0 } c.mu.RUnlock() @@ -350,10 +354,20 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { // split across several files in sequence. generations := c.findGenerations(true) - // If there is only one generation and no tombstones, then there's nothing to - // do. - if len(generations) <= 1 && !generations.hasTombstones() { - return nil, 0 + // Safety check for potential 0 generations. + if len(generations) < 1 { + return nil, 0, 0 + } + + // Return if there's only a single file and single generation. + if len(generations) == 1 && len(generations[0].files) == 1 { + return nil, 0, 0 + } + + // If there is a single generation, no tombstones, and the entire group size (all TSM files in the generation) + // is over or equal to 2 GB (Max TSM file size) there is nothing to do and we will return. + if len(generations) == 1 && generations[0].size() >= uint64(maxTSMFileSize) && !generations.hasTombstones() { + return nil, 0, 0 } // Group each generation by level such that two adjacent generations in the same @@ -363,11 +377,6 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { for i := 0; i < len(generations); i++ { cur := generations[i] - // Skip the file if it's over the max size and contains a full block and it does not have any tombstones - if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() { - continue - } - // See if this generation is orphan'd which would prevent it from being further // compacted until a final full compactin runs. if i < len(generations)-1 { @@ -392,21 +401,21 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { } // Only optimize level 4 files since using lower-levels will collide - // with the level planners + // with the level planners. If this is a single generation optimization + // do not skip any levels. var levelGroups []tsmGenerations - for _, cur := range groups { - if cur.level() == 4 { - levelGroups = append(levelGroups, cur) + if len(generations) == 1 { + levelGroups = append(levelGroups, groups...) + } else { + for _, cur := range groups { + if cur.level() == 4 { + levelGroups = append(levelGroups, cur) + } } } var cGroups []CompactionGroup for _, group := range levelGroups { - // Skip the group if it's not worthwhile to optimize it - if len(group) < 4 && !group.hasTombstones() { - continue - } - var cGroup CompactionGroup for _, gen := range group { for _, file := range gen.files { @@ -417,11 +426,18 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { cGroups = append(cGroups, cGroup) } + // The third return value is the generation length. + // Need to use this to decide how many points per block to use during compaction. + // This value is mostly ignored in normal compaction code paths, but, + // for the edge case where there is a single generation with many + // files and a group size under 2 GB we need to know that there is a single + // generation so the block size can be adjusted to allow for more optimal + // compaction. if !c.acquire(cGroups) { - return nil, int64(len(cGroups)) + return nil, int64(len(cGroups)), int64(len(generations)) } - return cGroups, int64(len(cGroups)) + return cGroups, int64(len(cGroups)), int64(len(generations)) } // Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns @@ -905,6 +921,10 @@ func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, e // compact writes multiple smaller TSM files into 1 or more larger files. func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) { + // Sets the points per block size. The larger this value is set + // the more points there will be a single index. Under normal + // conditions this should always be 1000 but there is an edge case + // where this is increased. size := c.Size if size <= 0 { size = tsdb.DefaultMaxPointsPerBlock diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index c90beb6b1cf..2464cee2bc8 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -2200,11 +2201,13 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) { ) expFiles := []tsm1.FileStat{} - tsm, pLen := cp.PlanOptimize() + tsm, pLen, gLen := cp.PlanOptimize() if exp, got := len(expFiles), len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } else if pLen != int64(len(tsm)) { t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) + } else if gLen != int64(3) { + t.Fatalf("generation len plan mismatch: got %v, exp %v", gLen, 3) } } @@ -2249,7 +2252,7 @@ func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) { ) expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3], data[4], data[5]} - tsm, pLen := cp.PlanOptimize() + tsm, pLen, _ := cp.PlanOptimize() if exp, got := 1, len(tsm); exp != got { t.Fatalf("group length mismatch: got %v, exp %v", got, exp) } else if pLen != int64(len(tsm)) { @@ -2267,6 +2270,218 @@ func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) { } } +// This test is added to acount for many TSM files within a group being over 2 GB +// we want to ensure that the shard will be planned. +func TestDefaultPlanner_PlanOptimize_LargeMultiGeneration(t *testing.T) { + data := []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 1048 * 1024 * 1024, + }, + { + Path: "02-05.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-06.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-07.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-08.tsm1", + Size: 1048 * 1024 * 1024, + }, + { + Path: "03-04.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "03-05.tsm1", + Size: 512 * 1024 * 1024, + }, + } + + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, tsdb.DefaultCompactFullWriteColdDuration, + ) + + expFiles := make([]tsm1.FileStat, 0) + for _, file := range data { + expFiles = append(expFiles, file) + } + + tsm, pLen, _ := cp.PlanOptimize() + require.Equal(t, 1, len(tsm), "group length mismatch: got %d, exp %d", len(tsm), 1) + require.Equal(t, int64(len(tsm)), pLen, "tsm file plan length mismatch: got %d, exp %d", pLen, int64(len(tsm))) + require.Equal(t, len(expFiles), len(tsm[0]), "tsm file length mismatch: got %d, exp %d", len(tsm[0]), len(expFiles)) +} + +// This test is added to account for a single generation that has a group size +// under 2 GB so it should be further compacted to a single file. +func TestDefaultPlanner_PlanOptimize_SmallSingleGeneration(t *testing.T) { + // ~650 MB total group size + data := []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 300 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 200 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 100 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 50 * 1024 * 1024, + }, + } + + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, tsdb.DefaultCompactFullWriteColdDuration, + ) + + expFiles := make([]tsm1.FileStat, 0) + for _, file := range data { + expFiles = append(expFiles, file) + } + + tsm, pLen, gLen := cp.PlanOptimize() + require.Equal(t, 1, len(tsm), "group length mismatch: got %d, exp %d", len(tsm), 1) + require.Equal(t, int64(len(tsm)), pLen, "tsm file plan length mismatch: got %d, exp %d", pLen, int64(len(expFiles))) + require.Equal(t, int64(1), gLen, "generation length mismatch: got %d, exp %d", gLen, 1) + require.Equal(t, len(expFiles), len(tsm[0]), "tsm file length mismatch: got %d, exp %d", len(tsm[0]), expFiles) +} + +// This test is added to account for a single generation that has a group size +// under 2 GB and has less then level 4 files it should be further compacted to a single file. +// FullyCompacted should NOT skip over opening this shard. +func TestDefaultPlanner_PlanOptimize_SmallSingleGenerationUnderLevel4(t *testing.T) { + // ~650 MB total group size + data := []tsm1.FileStat{ + { + Path: "01-02.tsm1", + Size: 300 * 1024 * 1024, + }, + { + Path: "01-03.tsm1", + Size: 200 * 1024 * 1024, + }, + { + Path: "01-04.tsm1", + Size: 100 * 1024 * 1024, + }, + } + + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, tsdb.DefaultCompactFullWriteColdDuration, + ) + + expFiles := make([]tsm1.FileStat, 0) + for _, file := range data { + expFiles = append(expFiles, file) + } + + tsm, pLen, gLen := cp.PlanOptimize() + require.Equal(t, 1, len(tsm), "group length mismatch: got %d, exp %d", len(tsm), 1) + require.Equal(t, int64(len(tsm)), pLen, "tsm file plan length mismatch: got %d, exp %d", pLen, int64(len(expFiles))) + require.Equal(t, int64(1), gLen, "generation length mismatch: got %d, exp %d", gLen, 1) + require.Equal(t, len(expFiles), len(tsm[0]), "tsm file length mismatch: got %d, exp %d", len(tsm[0]), expFiles) +} + +// This test is added to account for a single generation that has a group size +// under 2 GB so it should be further compacted to a single file. +// FullyCompacted should NOT skip over opening this shard. +func TestDefaultPlanner_FullyCompacted_SmallSingleGeneration(t *testing.T) { + // ~650 MB total group size + data := []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 300 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 200 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 100 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 50 * 1024 * 1024, + }, + } + + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, tsdb.DefaultCompactFullWriteColdDuration, + ) + + compacted, reason := cp.FullyCompacted() + reasonExp := "not fully compacted and not idle because group size under 2 GB and more then single file" + require.Equal(t, reason, reasonExp) + require.Equal(t, false, compacted) +} + +// This test is added to account for halting state after +// TestDefaultPlanner_FullyCompacted_SmallSingleGeneration +// will need to ensure that once we have single TSM file under 2 GB we stop +func TestDefaultPlanner_FullyCompacted_SmallSingleGeneration_Halt(t *testing.T) { + // ~650 MB total group size + data := []tsm1.FileStat{ + { + Path: "01-09.tsm1", + Size: 650 * 1024 * 1024, + }, + } + + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, tsdb.DefaultCompactFullWriteColdDuration, + ) + + compacted, reason := cp.FullyCompacted() + reasonExp := "" + require.Equal(t, reason, reasonExp) + require.Equal(t, true, compacted) +} + func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) { data := []tsm1.FileStat{ { @@ -2322,7 +2537,7 @@ func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) { expFiles1 := []tsm1.FileStat{data[0], data[1], data[2], data[3]} expFiles2 := []tsm1.FileStat{data[6], data[7], data[8], data[9]} - tsm, pLen := cp.PlanOptimize() + tsm, pLen, _ := cp.PlanOptimize() if exp, got := 2, len(tsm); exp != got { t.Fatalf("group length mismatch: got %v, exp %v", got, exp) } else if pLen != int64(len(tsm)) { @@ -2375,7 +2590,7 @@ func TestDefaultPlanner_PlanOptimize_Optimized(t *testing.T) { ) expFiles := []tsm1.FileStat{} - tsm, pLen := cp.PlanOptimize() + tsm, pLen, _ := cp.PlanOptimize() if exp, got := len(expFiles), len(tsm); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } else if pLen != int64(len(tsm)) { @@ -2409,7 +2624,7 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) { ) expFiles := []tsm1.FileStat{data[0], data[1], data[2]} - tsm, pLen := cp.PlanOptimize() + tsm, pLen, _ := cp.PlanOptimize() if exp, got := len(expFiles), len(tsm[0]); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) } else if pLen != int64(len(tsm)) { @@ -2586,7 +2801,7 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { } cp.Release(plan) - plan, pLen = cp.PlanOptimize() + plan, pLen, _ = cp.PlanOptimize() // ensure the optimize planner would pick this up if exp, got := 1, len(plan); got != exp { t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index a27e41f8254..4e1d7defeb7 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -2130,8 +2130,9 @@ func (e *Engine) compact(wg *sync.WaitGroup) { atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, len4) // If no full compactions are need, see if an optimize is needed + var genLen int64 if len(level4Groups) == 0 { - level4Groups, len4 = e.CompactionPlan.PlanOptimize() + level4Groups, len4, genLen = e.CompactionPlan.PlanOptimize() atomic.StoreInt64(&e.stats.TSMOptimizeCompactionsQueue, len4) } @@ -2166,6 +2167,17 @@ func (e *Engine) compact(wg *sync.WaitGroup) { level3Groups = level3Groups[1:] } case 4: + // This is a heuristic. 100_000 points per block is suitable for when we have a + // single generation with multiple files at max block size under 2 GB. + if genLen == 1 { + // Log TSM files that will have an increased points per block count. + for _, f := range level4Groups[0] { + e.logger.Info("TSM optimized compaction on single generation running, increasing total points per block to 100_000.", zap.String("path", f)) + } + e.Compactor.Size = tsdb.DefaultMaxPointsPerBlock * 100 + } else { + e.Compactor.Size = tsdb.DefaultMaxPointsPerBlock + } if e.compactFull(level4Groups[0], wg) { level4Groups = level4Groups[1:] } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 947f4f65842..ae7cc19a330 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -2889,7 +2889,7 @@ type mockPlanner struct{} func (m *mockPlanner) Plan(lastWrite time.Time) ([]tsm1.CompactionGroup, int64) { return nil, 0 } func (m *mockPlanner) PlanLevel(level int) ([]tsm1.CompactionGroup, int64) { return nil, 0 } -func (m *mockPlanner) PlanOptimize() ([]tsm1.CompactionGroup, int64) { return nil, 0 } +func (m *mockPlanner) PlanOptimize() ([]tsm1.CompactionGroup, int64, int64) { return nil, 0, 0 } func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} func (m *mockPlanner) FullyCompacted() (bool, string) { return false, "not compacted" } func (m *mockPlanner) ForceFull() {}