diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index c4be1156766..53d7bf6b9ef 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -225,6 +225,10 @@ func (c *DefaultPlanner) FullyCompacted() (bool, string) { } else if gens.hasTombstones() { return false, "not fully compacted and not idle because of tombstones" } else { + // Safety: check for first index so we don't accidentally do out of bounds access + if len(gens) == 1 && len(gens[0].files) > 1 && gens[0].size() < uint64(maxTSMFileSize) { + return false, "" + } return true, "" } } @@ -350,9 +354,14 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { // split across several files in sequence. generations := c.findGenerations(true) - // If there is only one generation and no tombstones, then there's nothing to - // do. - if len(generations) <= 1 && !generations.hasTombstones() { + // Safety check for potential 0 generations. + if len(generations) < 1 { + return nil, 0 + } + // If there is a single generation, no tombstones, and the entire group size (all TSM files in the generation) + // is over or equal to 2 GB (Max TSM file size) there is nothing to do and we will return. Please see: https://github.com/influxdata/plutonium/issues/4201 + // for more information regarding this logic. + if len(generations) == 1 && generations[0].size() >= uint64(maxTSMFileSize) && !generations.hasTombstones() { return nil, 0 } @@ -363,11 +372,6 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { for i := 0; i < len(generations); i++ { cur := generations[i] - // Skip the file if it's over the max size and contains a full block and it does not have any tombstones - if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() { - continue - } - // See if this generation is orphan'd which would prevent it from being further // compacted until a final full compactin runs. if i < len(generations)-1 { @@ -402,12 +406,8 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) { var cGroups []CompactionGroup for _, group := range levelGroups { - // Skip the group if it's not worthwhile to optimize it - if len(group) < 4 && !group.hasTombstones() { - continue - } - var cGroup CompactionGroup + for _, gen := range group { for _, file := range gen.files { cGroup = append(cGroup, file.Path) @@ -905,6 +905,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, e // compact writes multiple smaller TSM files into 1 or more larger files. func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) { + // TODO: Figure out how to overwrite this size variable size := c.Size if size <= 0 { size = tsdb.DefaultMaxPointsPerBlock diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index c90beb6b1cf..86772b0cdf7 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -2267,6 +2267,198 @@ func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) { } } +// This test is added to acount for many TSM files within a group being over 2 GB +// we want to ensure that the shard will be planned. +func TestDefaultPlanner_PlanOptimize_LargeMultiGeneration(t *testing.T) { + data := []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 1048 * 1024 * 1024, + }, + { + Path: "02-05.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-06.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-07.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "02-08.tsm1", + Size: 1048 * 1024 * 1024, + }, + { + Path: "03-04.tsm1", + Size: 2048 * 1024 * 1024, + }, + { + Path: "03-05.tsm1", + Size: 512 * 1024 * 1024, + }, + } + + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, tsdb.DefaultCompactFullWriteColdDuration, + ) + + expFiles := make([]tsm1.FileStat, 0) + for _, file := range data { + expFiles = append(expFiles, file) + } + + tsm, pLen := cp.PlanOptimize() + if exp, got := 1, len(tsm); exp != got { + t.Fatalf("group length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) + } + + if exp, got := len(expFiles), len(tsm[0]); got != exp { + t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } +} + +// This test is added to account for a single generation that has a group size +// under 2 GB so it should be further compacted to a single file. +// Please see: https://github.com/influxdata/EAR/issues/5531 +func TestDefaultPlanner_PlanOptimize_SmallSingleGeneration(t *testing.T) { + // ~650 MB total group size + data := []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 300 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 200 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 100 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 50 * 1024 * 1024, + }, + } + + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, tsdb.DefaultCompactFullWriteColdDuration, + ) + + expFiles := make([]tsm1.FileStat, 0) + for _, file := range data { + expFiles = append(expFiles, file) + } + + tsm, pLen := cp.PlanOptimize() + if exp, got := 1, len(tsm); exp != got { + t.Fatalf("group length mismatch: got %v, exp %v", got, exp) + } else if pLen != int64(len(tsm)) { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp) + } + + if exp, got := len(expFiles), len(tsm[0]); got != exp { + t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp) + } +} + +// This test is added to account for a single generation that has a group size +// under 2 GB so it should be further compacted to a single file. +// Please see: https://github.com/influxdata/EAR/issues/5531 +// FullyCompacted should NOT skip over opening this shard. +func TestDefaultPlanner_FullyCompacted_SmallSingleGeneration(t *testing.T) { + // ~650 MB total group size + data := []tsm1.FileStat{ + { + Path: "01-05.tsm1", + Size: 300 * 1024 * 1024, + }, + { + Path: "01-06.tsm1", + Size: 200 * 1024 * 1024, + }, + { + Path: "01-07.tsm1", + Size: 100 * 1024 * 1024, + }, + { + Path: "01-08.tsm1", + Size: 50 * 1024 * 1024, + }, + } + + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, tsdb.DefaultCompactFullWriteColdDuration, + ) + + compacted, reason := cp.FullyCompacted() + reasonExp := "not fully compacted and not idle because of more than one generation" + if exp, got := false, compacted; exp != got { + t.Fatalf("group length mismatch: got %v, exp %v", got, exp) + } else if reason != reasonExp { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", reason, reasonExp) + } +} + +// This test is added to account for halting state after +// TestDefaultPlanner_FullyCompacted_SmallSingleGeneration +// will need to ensure that once we have single TSM file under 2 GB we stop +// Please see: https://github.com/influxdata/EAR/issues/5531 +func TestDefaultPlanner_FullyCompacted_SmallSingleGeneration_Halt(t *testing.T) { + // ~650 MB total group size + data := []tsm1.FileStat{ + { + Path: "01-09.tsm1", + Size: 650 * 1024 * 1024, + }, + } + + cp := tsm1.NewDefaultPlanner( + &fakeFileStore{ + PathsFn: func() []tsm1.FileStat { + return data + }, + }, tsdb.DefaultCompactFullWriteColdDuration, + ) + + compacted, reason := cp.FullyCompacted() + reasonExp := "" + if exp, got := true, compacted; exp != got { + t.Fatalf("group length mismatch: got %v, exp %v", got, exp) + } else if reason != reasonExp { + t.Fatalf("tsm file plan length mismatch: got %v, exp %v", reason, reasonExp) + } +} + func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) { data := []tsm1.FileStat{ {