Skip to content

Commit

Permalink
feat: Modify optimized compaction to cover edge cases
Browse files Browse the repository at this point in the history
This PR changes the algorithm for compaction to account for the following
cases that were not previously accounted for:

- Many generations with a groupsize over 2 GB
- Single generation with many files and a groupsize under 2 GB
Where groupsize is the total size of the TSM files in said shard directory.

closes #25666
  • Loading branch information
devanbenz committed Dec 16, 2024
1 parent 45a8227 commit d631314
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 31 deletions.
66 changes: 43 additions & 23 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type CompactionGroup []string
type CompactionPlanner interface {
Plan(lastWrite time.Time) ([]CompactionGroup, int64)
PlanLevel(level int) ([]CompactionGroup, int64)
PlanOptimize() ([]CompactionGroup, int64)
PlanOptimize() ([]CompactionGroup, int64, int64)
Release(group []CompactionGroup)
FullyCompacted() (bool, string)

Expand Down Expand Up @@ -225,6 +225,10 @@ func (c *DefaultPlanner) FullyCompacted() (bool, string) {
} else if gens.hasTombstones() {
return false, "not fully compacted and not idle because of tombstones"
} else {
// Safety: check for first index so we don't accidentally do out of bounds access
if len(gens) == 1 && len(gens[0].files) > 1 && gens[0].size() < uint64(maxTSMFileSize) {
return false, "not fully compacted and not idle because group size under 2 GB and more then single file"
}
return true, ""
}
}
Expand Down Expand Up @@ -335,13 +339,13 @@ func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) {
// PlanOptimize returns all TSM files if they are in different generations in order
// to optimize the index across TSM files. Each returned compaction group can be
// compacted concurrently.
func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64, int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
if c.forceFull {
c.mu.RUnlock()
return nil, 0
return nil, 0, 0
}
c.mu.RUnlock()

Expand All @@ -350,10 +354,20 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
// split across several files in sequence.
generations := c.findGenerations(true)

// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil, 0
// Safety check for potential 0 generations.
if len(generations) < 1 {
return nil, 0, 0
}

// Return if there's only a single file and single generation.
if len(generations) == 1 && len(generations[0].files) == 1 {
return nil, 0, 0
}

// If there is a single generation, no tombstones, and the entire group size (all TSM files in the generation)
// is over or equal to 2 GB (Max TSM file size) there is nothing to do and we will return.
if len(generations) == 1 && generations[0].size() >= uint64(maxTSMFileSize) && !generations.hasTombstones() {
return nil, 0, 0
}

// Group each generation by level such that two adjacent generations in the same
Expand All @@ -363,11 +377,6 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
for i := 0; i < len(generations); i++ {
cur := generations[i]

// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() {
continue
}

// See if this generation is orphan'd which would prevent it from being further
// compacted until a final full compactin runs.
if i < len(generations)-1 {
Expand All @@ -392,21 +401,21 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
}

// Only optimize level 4 files since using lower-levels will collide
// with the level planners
// with the level planners. If this is a single generation optimization
// do not skip any levels.
var levelGroups []tsmGenerations
for _, cur := range groups {
if cur.level() == 4 {
levelGroups = append(levelGroups, cur)
if len(generations) == 1 {
levelGroups = append(levelGroups, groups...)
} else {
for _, cur := range groups {
if cur.level() == 4 {
levelGroups = append(levelGroups, cur)
}
}
}

var cGroups []CompactionGroup
for _, group := range levelGroups {
// Skip the group if it's not worthwhile to optimize it
if len(group) < 4 && !group.hasTombstones() {
continue
}

var cGroup CompactionGroup
for _, gen := range group {
for _, file := range gen.files {
Expand All @@ -417,11 +426,18 @@ func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
cGroups = append(cGroups, cGroup)
}

// The third return value is the generation length.
// Need to use this to decide how many points per block to use during compaction.
// This value is mostly ignored in normal compaction code paths, but,
// for the edge case where there is a single generation with many
// files and a group size under 2 GB we need to know that there is a single
// generation so the block size can be adjusted to allow for more optimal
// compaction.
if !c.acquire(cGroups) {
return nil, int64(len(cGroups))
return nil, int64(len(cGroups)), int64(len(generations))
}

return cGroups, int64(len(cGroups))
return cGroups, int64(len(cGroups)), int64(len(generations))
}

// Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns
Expand Down Expand Up @@ -905,6 +921,10 @@ func (c *Compactor) WriteSnapshot(cache *Cache, logger *zap.Logger) ([]string, e

// compact writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger) ([]string, error) {
// Sets the points per block size. The larger this value is set
// the more points there will be a single index. Under normal
// conditions this should always be 1000 but there is an edge case
// where this is increased.
size := c.Size
if size <= 0 {
size = tsdb.DefaultMaxPointsPerBlock
Expand Down
Loading

0 comments on commit d631314

Please sign in to comment.