Skip to content

Commit

Permalink
feat: This PR adds a resolution for some edge cases we were seeing wi…
Browse files Browse the repository at this point in the history
…th our compactor.

Please see: influxdata/plutonium#4201 (comment) for more
details. This PR changes the algorithm for compaction to allow for single generations that
have a group size of >= 2 GB to be scheduled for compaction. It also is adjusted to only
halt optimized compaction once there are no tombstones, and a single generation. If the group size
is less then 2 GB it will check to ensure there is only 1 file in the shard to halt compaction.

Closes influxdata/plutonium#4201
Closes influxdata/EAR#5672
Closes influxdata/EAR#5531
  • Loading branch information
devanbenz committed Dec 13, 2024
1 parent 45a8227 commit cab638c
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 13 deletions.
27 changes: 14 additions & 13 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ func (c *DefaultPlanner) FullyCompacted() (bool, string) {
} else if gens.hasTombstones() {
return false, "not fully compacted and not idle because of tombstones"
} else {
// Safety: check for first index so we don't accidentally do out of bounds access
if len(gens) == 1 && len(gens[0].files) > 1 && gens[0].size() < uint64(maxTSMFileSize) {
return false, ""
}
return true, ""
}
}
Expand Down Expand Up @@ -350,9 +354,14 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
// split across several files in sequence.
generations := c.findGenerations(true)

// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
// Safety check for potential 0 generations.
if len(generations) < 1 {
return nil, 0
}
// If there is a single generation, no tombstones, and the entire group size (all TSM files in the generation)
// is over or equal to 2 GB (Max TSM file size) there is nothing to do and we will return. Please see: https://github.com/influxdata/plutonium/issues/4201
// for more information regarding this logic.
if len(generations) == 1 && generations[0].size() >= uint64(maxTSMFileSize) && !generations.hasTombstones() {
return nil, 0
}

Expand All @@ -363,11 +372,6 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
for i := 0; i < len(generations); i++ {
cur := generations[i]

// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() {
continue
}

// See if this generation is orphan'd which would prevent it from being further
// compacted until a final full compactin runs.
if i < len(generations)-1 {
Expand Down Expand Up @@ -402,12 +406,8 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {

var cGroups []CompactionGroup
for _, group := range levelGroups {
// Skip the group if it's not worthwhile to optimize it
if len(group) < 4 && !group.hasTombstones() {
continue
}

var cGroup CompactionGroup

for _, gen := range group {
for _, file := range gen.files {
cGroup = append(cGroup, file.Path)
Expand Down Expand Up @@ -905,6 +905,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, e

// compact writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) {
// TODO: Figure out how to overwrite this size variable
size := c.Size
if size <= 0 {
size = tsdb.DefaultMaxPointsPerBlock
Expand Down
192 changes: 192 additions & 0 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,198 @@ func TestDefaultPlanner_PlanOptimize_Level4(t *testing.T) {
}
}

// This test is added to acount for many TSM files within a group being over 2 GB
// we want to ensure that the shard will be planned.
func TestDefaultPlanner_PlanOptimize_LargeMultiGeneration(t *testing.T) {
data := []tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "01-08.tsm1",
Size: 1048 * 1024 * 1024,
},
{
Path: "02-05.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-06.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-07.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "02-08.tsm1",
Size: 1048 * 1024 * 1024,
},
{
Path: "03-04.tsm1",
Size: 2048 * 1024 * 1024,
},
{
Path: "03-05.tsm1",
Size: 512 * 1024 * 1024,
},
}

cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
}, tsdb.DefaultCompactFullWriteColdDuration,
)

expFiles := make([]tsm1.FileStat, 0)
for _, file := range data {
expFiles = append(expFiles, file)
}

tsm, pLen := cp.PlanOptimize()
if exp, got := 1, len(tsm); exp != got {
t.Fatalf("group length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}

if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
}

// This test is added to account for a single generation that has a group size
// under 2 GB so it should be further compacted to a single file.
// Please see: https://github.com/influxdata/EAR/issues/5531
func TestDefaultPlanner_PlanOptimize_SmallSingleGeneration(t *testing.T) {
// ~650 MB total group size
data := []tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 300 * 1024 * 1024,
},
{
Path: "01-06.tsm1",
Size: 200 * 1024 * 1024,
},
{
Path: "01-07.tsm1",
Size: 100 * 1024 * 1024,
},
{
Path: "01-08.tsm1",
Size: 50 * 1024 * 1024,
},
}

cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
}, tsdb.DefaultCompactFullWriteColdDuration,
)

expFiles := make([]tsm1.FileStat, 0)
for _, file := range data {
expFiles = append(expFiles, file)
}

tsm, pLen := cp.PlanOptimize()
if exp, got := 1, len(tsm); exp != got {
t.Fatalf("group length mismatch: got %v, exp %v", got, exp)
} else if pLen != int64(len(tsm)) {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", pLen, exp)
}

if exp, got := len(expFiles), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
}

// This test is added to account for a single generation that has a group size
// under 2 GB so it should be further compacted to a single file.
// Please see: https://github.com/influxdata/EAR/issues/5531
// FullyCompacted should NOT skip over opening this shard.
func TestDefaultPlanner_FullyCompacted_SmallSingleGeneration(t *testing.T) {
// ~650 MB total group size
data := []tsm1.FileStat{
{
Path: "01-05.tsm1",
Size: 300 * 1024 * 1024,
},
{
Path: "01-06.tsm1",
Size: 200 * 1024 * 1024,
},
{
Path: "01-07.tsm1",
Size: 100 * 1024 * 1024,
},
{
Path: "01-08.tsm1",
Size: 50 * 1024 * 1024,
},
}

cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
}, tsdb.DefaultCompactFullWriteColdDuration,
)

compacted, reason := cp.FullyCompacted()
reasonExp := "not fully compacted and not idle because of more than one generation"
if exp, got := false, compacted; exp != got {
t.Fatalf("group length mismatch: got %v, exp %v", got, exp)
} else if reason != reasonExp {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", reason, reasonExp)
}
}

// This test is added to account for halting state after
// TestDefaultPlanner_FullyCompacted_SmallSingleGeneration
// will need to ensure that once we have single TSM file under 2 GB we stop
// Please see: https://github.com/influxdata/EAR/issues/5531
func TestDefaultPlanner_FullyCompacted_SmallSingleGeneration_Halt(t *testing.T) {
// ~650 MB total group size
data := []tsm1.FileStat{
{
Path: "01-09.tsm1",
Size: 650 * 1024 * 1024,
},
}

cp := tsm1.NewDefaultPlanner(
&fakeFileStore{
PathsFn: func() []tsm1.FileStat {
return data
},
}, tsdb.DefaultCompactFullWriteColdDuration,
)

compacted, reason := cp.FullyCompacted()
reasonExp := ""
if exp, got := true, compacted; exp != got {
t.Fatalf("group length mismatch: got %v, exp %v", got, exp)
} else if reason != reasonExp {
t.Fatalf("tsm file plan length mismatch: got %v, exp %v", reason, reasonExp)
}
}

func TestDefaultPlanner_PlanOptimize_Multiple(t *testing.T) {
data := []tsm1.FileStat{
{
Expand Down

0 comments on commit cab638c

Please sign in to comment.