Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: move aside TSM file on errBlockRead #25899

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ type errBlockRead struct {
err error
}

func (e errBlockRead) Unwrap() error {
return e.err
}

func (e errBlockRead) Is(target error) bool {
_, ok := target.(errBlockRead)
return ok
}

func (e errBlockRead) Error() string {
if e.err != nil {
return fmt.Sprintf("block read error on %s: %s", e.file, e.err)
Expand Down Expand Up @@ -1348,6 +1357,9 @@ type tsmBatchKeyIterator struct {
// errs is any error we received while iterating values.
errs TSMErrors

// errSet is the error strings we have seen before
errSet map[string]struct{}

// indicates whether the iterator should choose a faster merging strategy over a more
// optimally compressed one. If fast is true, multiple blocks will just be added as is
// and not combined. In some cases, a slower path will need to be utilized even when
Expand Down Expand Up @@ -1393,13 +1405,18 @@ type tsmBatchKeyIterator struct {
overflowErrors int
}

// AppendError - store unique errors in the order of first appearance,
// up to a limit of maxErrors. If the error is unique and stored, return true.
func (t *tsmBatchKeyIterator) AppendError(err error) bool {
if t.maxErrors > len(t.errs) {
s := err.Error()
if _, ok := t.errSet[s]; ok {
return true
} else if t.maxErrors > len(t.errs) {
t.errs = append(t.errs, err)
// Was the error stored?
t.errSet[s] = struct{}{}
return true
} else {
// Was the error dropped
// Was the error dropped?
t.overflowErrors++
return false
}
Expand All @@ -1417,6 +1434,7 @@ func NewTSMBatchKeyIterator(size int, fast bool, maxErrors int, interrupt chan s
readers: readers,
values: map[string][]Value{},
pos: make([]int, len(readers)),
errSet: map[string]struct{}{},
size: size,
iterators: iter,
fast: fast,
Expand Down Expand Up @@ -1616,11 +1634,11 @@ func (k *tsmBatchKeyIterator) merge() {
}

func (k *tsmBatchKeyIterator) handleEncodeError(err error, typ string) {
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("encode error: unable to compress block type %s for key '%s': %v", typ, k.key, err)})
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("encode error: unable to compress block type %s for key '%s': %w", typ, k.key, err)})
}

func (k *tsmBatchKeyIterator) handleDecodeError(err error, typ string) {
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v", typ, k.key, err)})
k.AppendError(errBlockRead{k.currentTsm, fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %w", typ, k.key, err)})
}

func (k *tsmBatchKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
Expand All @@ -1647,6 +1665,8 @@ func (k *tsmBatchKeyIterator) Close() error {
for _, r := range k.readers {
errSlice = append(errSlice, r.Close())
}
clear(k.errSet)
k.errs = nil
return errors.Join(errSlice...)
}

Expand All @@ -1656,11 +1676,10 @@ func (k *tsmBatchKeyIterator) Err() error {
return nil
}
// Copy the errors before appending the dropped error count
var errs TSMErrors
errs = make([]error, 0, len(k.errs)+1)
errs := make([]error, 0, len(k.errs)+1)
errs = append(errs, k.errs...)
errs = append(errs, fmt.Errorf("additional errors dropped: %d", k.overflowErrors))
return errs
return errors.Join(errs...)
}

type cacheKeyIterator struct {
Expand Down
22 changes: 11 additions & 11 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"testing"
"time"

"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -294,19 +294,19 @@ func TestCompactor_DecodeError(t *testing.T) {
compactor.FileStore = ffs

files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
if len(files) > 0 {
t.Fatalf("no files should be compacted: got %v", len(files))

}
require.Error(t, err, "expected error writing snapshot")
require.Zero(t, len(files), "no files should be compacted")

compactor.Open()

if _, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()); err == nil || !strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") {
t.Fatalf("expected error writing snapshot: %v", err)
}
_, err = compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop())

require.ErrorContains(t, err, "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp")
tsm1.MoveTsmOnReadErr(err, zap.NewNop(), func(strings []string, strings2 []string, f func([]tsm1.TSMFile)) error {
require.Equal(t, 1, len(strings))
require.Equal(t, strings[0], f3)
return nil
})
}

// Ensures that a compaction will properly merge multiple TSM files
Expand Down
25 changes: 15 additions & 10 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2236,16 +2236,7 @@ func (s *compactionStrategy) compactGroup() {

log.Warn("Error compacting TSM files", zap.Error(err))

// We hit a bad TSM file - rename so the next compaction can proceed.
if _, ok := err.(errBlockRead); ok {
path := err.(errBlockRead).file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.Error(err))
if err := s.fileStore.ReplaceWithCallback([]string{path}, nil, nil); err != nil {
log.Info("Error removing bad TSM file", zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.Error((err)))
}
}
MoveTsmOnReadErr(err, log, s.fileStore.ReplaceWithCallback)

s.errorStat.Inc()
time.Sleep(time.Second)
Expand Down Expand Up @@ -2273,6 +2264,20 @@ func (s *compactionStrategy) compactGroup() {
zap.Int("tsm1_files_n", len(files)))
}

func MoveTsmOnReadErr(err error, log *zap.Logger, ReplaceWithCallback func([]string, []string, func([]TSMFile)) error) {
var blockReadErr errBlockRead
// We hit a bad TSM file - rename so the next compaction can proceed.
if ok := errors.As(err, &blockReadErr); ok {
path := blockReadErr.file
log.Info("Renaming a corrupt TSM file due to compaction error", zap.String("file", path), zap.Error(err))
if err := ReplaceWithCallback([]string{path}, nil, nil); err != nil {
log.Info("Error removing bad TSM file", zap.String("file", path), zap.Error(err))
} else if e := os.Rename(path, path+"."+BadTSMFileExtension); e != nil {
log.Info("Error renaming corrupt TSM file", zap.String("file", path), zap.Error(err))
}
}
}

// levelCompactionStrategy returns a compactionStrategy for the given level.
// It returns nil if there are no TSM files to compact.
func (e *Engine) levelCompactionStrategy(group CompactionGroup, fast bool, level int) *compactionStrategy {
Expand Down
70 changes: 38 additions & 32 deletions tsdb/engine/tsm1/reader_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package tsm1

import (
"errors"
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1841,8 +1841,22 @@ func TestTSMReader_References(t *testing.T) {
}

func TestBatchKeyIterator_Errors(t *testing.T) {
const MaxErrors = 10

const testFile = "testFile.tsm"
errorCases := []error{
fmt.Errorf("test error 0"),
errBlockRead{
file: testFile,
err: fmt.Errorf("decode error: unable to decompress block type %s for key '%s': %v",
"string", "summary#!~#mfu_estimated_percent", fmt.Errorf("test invalid error 1"))},
fmt.Errorf("test error 2"),
// Duplicate error - should be stored once, but not counted as dropped.
fmt.Errorf("test error 2"),
fmt.Errorf("test error 3"),
fmt.Errorf("test error 4"),
}

// Store all but the last error
MaxErrors := len(errorCases) - 2
dir, name := createTestTSM(t)
defer os.RemoveAll(dir)
fr, err := os.Open(name)
Expand All @@ -1853,41 +1867,33 @@ func TestBatchKeyIterator_Errors(t *testing.T) {
if err != nil {
// Only have a deferred close if we could not create the TSMReader
defer func() {
if e := fr.Close(); e != nil {
t.Fatalf("unexpected error closing %s: %v", name, e)
}
require.NoError(t, fr.Close(), "unexpected error closing %s", name)
}()
} else {
defer func() {
require.NoError(t, r.Close(), "unexpected error closing TSMReader for %s", name)
}()

t.Fatalf("unexpected error creating TSMReader for %s: %v", name, err)
}
defer func() {
if e := r.Close(); e != nil {
t.Fatalf("error closing TSMReader for %s: %v", name, e)
}
}()
require.NoError(t, err, "unexpected error creating TSMReader for %s", name)
interrupts := make(chan struct{})
var iter KeyIterator
if iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r); err != nil {
t.Fatalf("unexpected error creating tsmBatchKeyIterator: %v", err)
}
var i int
for i = 0; i < MaxErrors*2; i++ {
saved := iter.(*tsmBatchKeyIterator).AppendError(fmt.Errorf("fake error: %d", i))
if i < MaxErrors && !saved {
t.Fatalf("error unexpectedly not saved: %d", i)
iter, err = NewTSMBatchKeyIterator(3, false, MaxErrors, interrupts, []string{name}, r)
require.NoError(t, err, "unexpected error creating tsmBatchKeyIterator")

for i := 0; i < 2; i++ {
for _, e := range errorCases {
saved := iter.(*tsmBatchKeyIterator).AppendError(e)
savedErrs := iter.(*tsmBatchKeyIterator).Err().(interface{ Unwrap() []error }).Unwrap()
require.False(t, len(savedErrs) < MaxErrors && !saved, "error not saved when it should have been: %v", e)
}
if i >= MaxErrors && saved {
t.Fatalf("error unexpectedly saved: %d", i)
}
}
errs := iter.Err()
if errCnt := len(errs.(TSMErrors)); errCnt != (MaxErrors + 1) {
t.Fatalf("saved wrong number of errors: expected %d, got %d", MaxErrors, errCnt)
}
expected := fmt.Sprintf("additional errors dropped: %d", i-MaxErrors)
if strings.Compare(errs.(TSMErrors)[MaxErrors].Error(), expected) != 0 {
t.Fatalf("expected: '%s', got: '%s", expected, errs.(TSMErrors)[MaxErrors].Error())
}
var blockReadError errBlockRead
iterErr := iter.Err()
joinErr, ok := iterErr.(interface{ Unwrap() []error })
require.True(t, ok, "errs does not implement Unwrap() as a joinError should: %T", iterErr)
require.Equal(t, 1+MaxErrors, len(joinErr.Unwrap()), "saved wrong number of errors")
require.True(t, errors.As(iterErr, &blockReadError), "expected errBlockRead error, got %T", err)
require.Equal(t, testFile, blockReadError.file, "unexpected file name in error")
}

func createTestTSM(t *testing.T) (dir string, name string) {
Expand Down
Loading