From db523227a2c8d8a05e317dbe0f1b9630b05838c6 Mon Sep 17 00:00:00 2001 From: cpinflux <146751324+cpinflux@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:35:45 +0000 Subject: [PATCH 1/3] feat: Added fluxQueryRespBytes metric to 1.x /debug/vars (#25669) This PR adds an additional statistic "fluxQueryRespBytes" to the output of /debug/vars, in turn making it available to Telegraf and other monitoring tools. Closes https://github.com/influxdata/influxdb/issues/25671 --- services/httpd/handler.go | 98 ++++++++++++++++++++------------------- services/httpd/service.go | 49 ++++++++++---------- 2 files changed, 76 insertions(+), 71 deletions(-) diff --git a/services/httpd/handler.go b/services/httpd/handler.go index ad856c37124..396dfd0a210 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -411,30 +411,31 @@ func (h *Handler) Close() { // Statistics maintains statistics for the httpd service. type Statistics struct { - Requests int64 - CQRequests int64 - QueryRequests int64 - WriteRequests int64 - PingRequests int64 - StatusRequests int64 - WriteRequestBytesReceived int64 - QueryRequestBytesTransmitted int64 - PointsWrittenOK int64 - PointsWrittenDropped int64 - PointsWrittenFail int64 - AuthenticationFailures int64 - RequestDuration int64 - QueryRequestDuration int64 - WriteRequestDuration int64 - ActiveRequests int64 - ActiveWriteRequests int64 - ClientErrors int64 - ServerErrors int64 - RecoveredPanics int64 - PromWriteRequests int64 - PromReadRequests int64 - FluxQueryRequests int64 - FluxQueryRequestDuration int64 + Requests int64 + CQRequests int64 + QueryRequests int64 + WriteRequests int64 + PingRequests int64 + StatusRequests int64 + WriteRequestBytesReceived int64 + QueryRequestBytesTransmitted int64 + PointsWrittenOK int64 + PointsWrittenDropped int64 + PointsWrittenFail int64 + AuthenticationFailures int64 + RequestDuration int64 + QueryRequestDuration int64 + WriteRequestDuration int64 + ActiveRequests int64 + ActiveWriteRequests int64 + ClientErrors int64 + ServerErrors int64 + RecoveredPanics int64 + PromWriteRequests int64 + PromReadRequests int64 + FluxQueryRequests int64 + FluxQueryRequestDuration int64 + FluxQueryRequestBytesTransmitted int64 } // Statistics returns statistics for periodic monitoring. @@ -443,29 +444,30 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic { Name: "httpd", Tags: tags, Values: map[string]interface{}{ - statRequest: atomic.LoadInt64(&h.stats.Requests), - statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests), - statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests), - statPingRequest: atomic.LoadInt64(&h.stats.PingRequests), - statStatusRequest: atomic.LoadInt64(&h.stats.StatusRequests), - statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived), - statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted), - statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK), - statPointsWrittenDropped: atomic.LoadInt64(&h.stats.PointsWrittenDropped), - statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail), - statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures), - statRequestDuration: atomic.LoadInt64(&h.stats.RequestDuration), - statQueryRequestDuration: atomic.LoadInt64(&h.stats.QueryRequestDuration), - statWriteRequestDuration: atomic.LoadInt64(&h.stats.WriteRequestDuration), - statRequestsActive: atomic.LoadInt64(&h.stats.ActiveRequests), - statWriteRequestsActive: atomic.LoadInt64(&h.stats.ActiveWriteRequests), - statClientError: atomic.LoadInt64(&h.stats.ClientErrors), - statServerError: atomic.LoadInt64(&h.stats.ServerErrors), - statRecoveredPanics: atomic.LoadInt64(&h.stats.RecoveredPanics), - statPromWriteRequest: atomic.LoadInt64(&h.stats.PromWriteRequests), - statPromReadRequest: atomic.LoadInt64(&h.stats.PromReadRequests), - statFluxQueryRequests: atomic.LoadInt64(&h.stats.FluxQueryRequests), - statFluxQueryRequestDuration: atomic.LoadInt64(&h.stats.FluxQueryRequestDuration), + statRequest: atomic.LoadInt64(&h.stats.Requests), + statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests), + statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests), + statPingRequest: atomic.LoadInt64(&h.stats.PingRequests), + statStatusRequest: atomic.LoadInt64(&h.stats.StatusRequests), + statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived), + statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted), + statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK), + statPointsWrittenDropped: atomic.LoadInt64(&h.stats.PointsWrittenDropped), + statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail), + statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures), + statRequestDuration: atomic.LoadInt64(&h.stats.RequestDuration), + statQueryRequestDuration: atomic.LoadInt64(&h.stats.QueryRequestDuration), + statWriteRequestDuration: atomic.LoadInt64(&h.stats.WriteRequestDuration), + statRequestsActive: atomic.LoadInt64(&h.stats.ActiveRequests), + statWriteRequestsActive: atomic.LoadInt64(&h.stats.ActiveWriteRequests), + statClientError: atomic.LoadInt64(&h.stats.ClientErrors), + statServerError: atomic.LoadInt64(&h.stats.ServerErrors), + statRecoveredPanics: atomic.LoadInt64(&h.stats.RecoveredPanics), + statPromWriteRequest: atomic.LoadInt64(&h.stats.PromWriteRequests), + statPromReadRequest: atomic.LoadInt64(&h.stats.PromReadRequests), + statFluxQueryRequests: atomic.LoadInt64(&h.stats.FluxQueryRequests), + statFluxQueryRequestDuration: atomic.LoadInt64(&h.stats.FluxQueryRequestDuration), + statFluxQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.FluxQueryRequestBytesTransmitted), }, }} } @@ -2161,6 +2163,8 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me if n == 0 { // If the encoder did not write anything, we can write an error header. h.httpError(w, err.Error(), http.StatusInternalServerError) + } else { + atomic.AddInt64(&h.stats.FluxQueryRequestBytesTransmitted, int64(n)) } } } diff --git a/services/httpd/service.go b/services/httpd/service.go index ee0b0b4941e..677fb893d45 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -20,30 +20,31 @@ import ( // statistics gathered by the httpd package. const ( - statRequest = "req" // Number of HTTP requests served. - statQueryRequest = "queryReq" // Number of query requests served. - statWriteRequest = "writeReq" // Number of write requests serverd. - statPingRequest = "pingReq" // Number of ping requests served. - statStatusRequest = "statusReq" // Number of status requests served. - statWriteRequestBytesReceived = "writeReqBytes" // Sum of all bytes in write requests. - statQueryRequestBytesTransmitted = "queryRespBytes" // Sum of all bytes returned in query reponses. - statPointsWrittenOK = "pointsWrittenOK" // Number of points written OK. - statValuesWrittenOK = "valuesWrittenOK" // Number of values (fields) written OK. - statPointsWrittenDropped = "pointsWrittenDropped" // Number of points dropped by the storage engine. - statPointsWrittenFail = "pointsWrittenFail" // Number of points that failed to be written. - statAuthFail = "authFail" // Number of authentication failures. - statRequestDuration = "reqDurationNs" // Number of (wall-time) nanoseconds spent inside requests. - statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests. - statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests. - statRequestsActive = "reqActive" // Number of currently active requests. - statWriteRequestsActive = "writeReqActive" // Number of currently active write requests. - statClientError = "clientError" // Number of HTTP responses due to client error. - statServerError = "serverError" // Number of HTTP responses due to server error. - statRecoveredPanics = "recoveredPanics" // Number of panics recovered by HTTP handler. - statPromWriteRequest = "promWriteReq" // Number of write requests to the prometheus endpoint. - statPromReadRequest = "promReadReq" // Number of read requests to the prometheus endpoint. - statFluxQueryRequests = "fluxQueryReq" // Number of flux query requests served. - statFluxQueryRequestDuration = "fluxQueryReqDurationNs" // Number of (wall-time) nanoseconds spent executing Flux query requests. + statRequest = "req" // Number of HTTP requests served. + statQueryRequest = "queryReq" // Number of query requests served. + statWriteRequest = "writeReq" // Number of write requests serverd. + statPingRequest = "pingReq" // Number of ping requests served. + statStatusRequest = "statusReq" // Number of status requests served. + statWriteRequestBytesReceived = "writeReqBytes" // Sum of all bytes in write requests. + statQueryRequestBytesTransmitted = "queryRespBytes" // Sum of all bytes returned in query reponses. + statPointsWrittenOK = "pointsWrittenOK" // Number of points written OK. + statValuesWrittenOK = "valuesWrittenOK" // Number of values (fields) written OK. + statPointsWrittenDropped = "pointsWrittenDropped" // Number of points dropped by the storage engine. + statPointsWrittenFail = "pointsWrittenFail" // Number of points that failed to be written. + statAuthFail = "authFail" // Number of authentication failures. + statRequestDuration = "reqDurationNs" // Number of (wall-time) nanoseconds spent inside requests. + statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests. + statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests. + statRequestsActive = "reqActive" // Number of currently active requests. + statWriteRequestsActive = "writeReqActive" // Number of currently active write requests. + statClientError = "clientError" // Number of HTTP responses due to client error. + statServerError = "serverError" // Number of HTTP responses due to server error. + statRecoveredPanics = "recoveredPanics" // Number of panics recovered by HTTP handler. + statPromWriteRequest = "promWriteReq" // Number of write requests to the prometheus endpoint. + statPromReadRequest = "promReadReq" // Number of read requests to the prometheus endpoint. + statFluxQueryRequests = "fluxQueryReq" // Number of flux query requests served. + statFluxQueryRequestDuration = "fluxQueryReqDurationNs" // Number of (wall-time) nanoseconds spent executing Flux query requests. + statFluxQueryRequestBytesTransmitted = "fluxQueryRespBytes" // Sum of all bytes returned in Flux query reponses. ) From 694607a22c190ed648de46ef029ef2b74d2cc064 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Fri, 27 Dec 2024 14:30:01 -0800 Subject: [PATCH 2/3] fix: avoid panic if shard group has no shards (#25717) (#25719) Avoid panicking when mapping points to a shard group that has no shards. This does not address the root problem, how the shard group ended up with no shards. helps: https://github.com/influxdata/influxdb/issues/25715 (cherry picked from commit 5b364b51c865f7acc18756f8dd1362bc92992a6d) closes: https://github.com/influxdata/influxdb/issues/25718 --- coordinator/points_writer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index e97c9c2dd2d..55fddc4ddbe 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -255,6 +255,9 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) mapping.Dropped = append(mapping.Dropped, p) atomic.AddInt64(&w.stats.WriteDropped, 1) continue + } else if len(sg.Shards) <= 0 { + // Shard groups should have at least one shard. + return nil, fmt.Errorf("shard group %d covering %s to %s has no shards", sg.ID, sg.StartTime, sg.EndTime) } sh := sg.ShardFor(p) From e974165d252a1904b5ede5b88104301b32842b81 Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Fri, 3 Jan 2025 14:43:41 -0800 Subject: [PATCH 3/3] fix: do not leak file handles from Compactor.write (#25725) There are a number of code paths in Compactor.write which on error can lead to leaked file handles to temporary files. This, in turn, prevents the removal of the temporary files until InfluxDB is rebooted, releasing the file handles. closes https://github.com/influxdata/influxdb/issues/25724 --- tsdb/engine/tsm1/compact.go | 93 ++++++++++++---------- tsdb/engine/tsm1/compact_test.go | 130 ++++++++++++++++++++++--------- tsdb/engine/tsm1/writer.go | 66 ++++++++-------- 3 files changed, 176 insertions(+), 113 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index c4be1156766..6da71e136cf 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "io" + "io/fs" "math" "os" "path/filepath" @@ -65,6 +66,10 @@ func (e errCompactionInProgress) Error() string { return "compaction in progress" } +func (e errCompactionInProgress) Unwrap() error { + return e.err +} + type errCompactionAborted struct { err error } @@ -1051,6 +1056,7 @@ func (c *Compactor) removeTmpFiles(files []string) error { func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) { // These are the new TSM files written var files []string + var eInProgress errCompactionInProgress for { sequence++ @@ -1060,15 +1066,15 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K logger.Debug("Compacting files", zap.Int("file_count", len(src)), zap.String("output_file", fileName)) // Write as much as possible to this file - err := c.write(fileName, iter, throttle, logger) + rollToNext, err := c.write(fileName, iter, throttle, logger) - // We've hit the max file limit and there is more to write. Create a new file - // and continue. - if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded { + if rollToNext { + // We've hit the max file limit and there is more to write. Create a new file + // and continue. files = append(files, fileName) logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName)) continue - } else if err == ErrNoValues { + } else if errors.Is(err, ErrNoValues) { logger.Debug("Dropping empty file", zap.String("output_file", fileName)) // If the file only contained tombstoned entries, then it would be a 0 length // file that we can drop. @@ -1076,9 +1082,14 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K return nil, err } break - } else if _, ok := err.(errCompactionInProgress); ok { - // Don't clean up the file as another compaction is using it. This should not happen as the - // planner keeps track of which files are assigned to compaction plans now. + } else if errors.As(err, &eInProgress) { + if !errors.Is(eInProgress.err, fs.ErrExist) { + logger.Error("error creating compaction file", zap.String("output_file", fileName), zap.Error(err)) + } else { + // Don't clean up the file as another compaction is using it. This should not happen as the + // planner keeps track of which files are assigned to compaction plans now. + logger.Warn("file exists, compaction in progress already", zap.String("output_file", fileName)) + } return nil, err } else if err != nil { // We hit an error and didn't finish the compaction. Abort. @@ -1100,10 +1111,10 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K return files, nil } -func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (err error) { +func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (rollToNext bool, err error) { fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666) if err != nil { - return errCompactionInProgress{err: err} + return false, errCompactionInProgress{err: err} } // syncingWriter ensures that whatever we wrap the above file descriptor in @@ -1128,33 +1139,31 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * // in memory. if iter.EstimatedIndexSize() > 64*1024*1024 { w, err = NewTSMWriterWithDiskBuffer(limitWriter) - if err != nil { - return err - } } else { w, err = NewTSMWriter(limitWriter) - if err != nil { - return err - } } - + if err != nil { + // Close the file and return if we can't create the TSMWriter + return false, errors.Join(err, fd.Close()) + } defer func() { + var eInProgress errCompactionInProgress + + errs := make([]error, 0, 3) + errs = append(errs, err) closeErr := w.Close() - if err == nil { - err = closeErr - } + errs = append(errs, closeErr) - // Check for errors where we should not remove the file - _, inProgress := err.(errCompactionInProgress) - maxBlocks := err == ErrMaxBlocksExceeded - maxFileSize := err == errMaxFileExceeded - if inProgress || maxBlocks || maxFileSize { + // Check for conditions where we should not remove the file + inProgress := errors.As(err, &eInProgress) && errors.Is(eInProgress.err, fs.ErrExist) + if (closeErr == nil) && (inProgress || rollToNext) { + // do not join errors, there is only the one. return + } else if err != nil || closeErr != nil { + // Remove the file, we have had a problem + errs = append(errs, w.Remove()) } - - if err != nil { - _ = w.Remove() - } + err = errors.Join(errs...) }() lastLogSize := w.Size() @@ -1164,38 +1173,38 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * c.mu.RUnlock() if !enabled { - return errCompactionAborted{} + return false, errCompactionAborted{} } // Each call to read returns the next sorted key (or the prior one if there are // more values to write). The size of values will be less than or equal to our // chunk size (1000) key, minTime, maxTime, block, err := iter.Read() if err != nil { - return err + return false, err } if minTime > maxTime { - return fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime) + return false, fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime) } // Write the key and value - if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded { + if err := w.WriteBlock(key, minTime, maxTime, block); errors.Is(err, ErrMaxBlocksExceeded) { if err := w.WriteIndex(); err != nil { - return err + return false, err } - return err + return true, err } else if err != nil { - return err + return false, err } - // If we have a max file size configured and we're over it, close out the file + // If we're over maxTSMFileSize, close out the file // and return the error. if w.Size() > maxTSMFileSize { if err := w.WriteIndex(); err != nil { - return err + return false, err } - return errMaxFileExceeded + return true, errMaxFileExceeded } else if (w.Size() - lastLogSize) > logEvery { logger.Debug("Compaction progress", zap.String("output_file", path), zap.Uint32("size", w.Size())) lastLogSize = w.Size() @@ -1204,15 +1213,15 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger * // Were there any errors encountered during iteration? if err := iter.Err(); err != nil { - return err + return false, err } // We're all done. Close out the file. if err := w.WriteIndex(); err != nil { - return err + return false, err } logger.Debug("Compaction finished", zap.String("output_file", path), zap.Uint32("size", w.Size())) - return nil + return false, nil } func (c *Compactor) add(files []string) bool { diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index c90beb6b1cf..ccfa6d4f469 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -1,7 +1,9 @@ package tsm1_test import ( + "errors" "fmt" + "io/fs" "math" "os" "path/filepath" @@ -12,6 +14,7 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "github.com/stretchr/testify/assert" "go.uber.org/zap" ) @@ -113,11 +116,11 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) { } f2 := MustWriteTSM(dir, 2, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Open() files, err := compactor.CompactFull([]string{f1, f2}, zap.NewNop()) @@ -170,11 +173,11 @@ func TestCompactor_CompactFull(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err == nil { @@ -280,11 +283,11 @@ func TestCompactor_DecodeError(t *testing.T) { f.WriteAt([]byte("ffff"), 10) // skip over header f.Close() - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs files, err := compactor.CompactFull([]string{f1, f2, f3}, zap.NewNop()) if err == nil { @@ -326,11 +329,11 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -406,11 +409,11 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -620,11 +623,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -722,11 +725,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -825,11 +828,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -933,11 +936,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { } f3 := MustWriteTSM(dir, 3, writes) - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Size = 2 compactor.Open() @@ -1049,11 +1052,11 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { } f2.Close() - fs := &fakeFileStore{} - defer fs.Close() + ffs := &fakeFileStore{} + defer ffs.Close() compactor := tsm1.NewCompactor() compactor.Dir = dir - compactor.FileStore = fs + compactor.FileStore = ffs compactor.Open() // Compact both files, should get 2 files back @@ -1086,6 +1089,61 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { } } +func TestCompactor_CompactFull_InProgress(t *testing.T) { + // This test creates a lot of data and causes timeout failures for these envs + if testing.Short() || os.Getenv("CI") != "" || os.Getenv("GORACE") != "" { + t.Skip("Skipping in progress compaction test") + } + dir := MustTempDir() + defer os.RemoveAll(dir) + + f2Name := func() string { + values := make([]tsm1.Value, 1000) + + // Write a new file with 2 blocks + f2, f2Name := MustTSMWriter(dir, 2) + defer func() { + assert.NoError(t, f2.Close(), "closing TSM file %s", f2Name) + }() + for i := 0; i < 2; i++ { + values = values[:0] + for j := 0; j < 1000; j++ { + values = append(values, tsm1.NewValue(int64(i*1000+j), int64(1))) + } + assert.NoError(t, f2.Write([]byte("cpu,host=A#!~#value"), values), "writing TSM file: %s", f2Name) + } + assert.NoError(t, f2.WriteIndex(), "writing TSM file index for %s", f2Name) + return f2Name + }() + ffs := &fakeFileStore{} + defer ffs.Close() + compactor := tsm1.NewCompactor() + compactor.Dir = dir + compactor.FileStore = ffs + compactor.Open() + + expGen, expSeq, err := tsm1.DefaultParseFileName(f2Name) + assert.NoError(t, err, "unexpected error parsing file name %s", f2Name) + expSeq = expSeq + 1 + + fileName := filepath.Join(compactor.Dir, tsm1.DefaultFormatFileName(expGen, expSeq)+"."+tsm1.TSMFileExtension+"."+tsm1.TmpTSMFileExtension) + + // Create a temp file to simulate an in progress compaction + f, err := os.Create(fileName) + assert.NoError(t, err, "creating in-progress compaction file %s", fileName) + defer func() { + assert.NoError(t, f.Close(), "closing in-progress compaction file %s", fileName) + }() + _, err = compactor.CompactFull([]string{f2Name}, zap.NewNop()) + assert.Errorf(t, err, "expected an error writing snapshot for %s", f2Name) + e := errors.Unwrap(err) + assert.NotNil(t, e, "expected an error wrapped by errCompactionInProgress") + assert.Truef(t, errors.Is(e, fs.ErrExist), "error did not indicate file existence: %v", e) + pathErr := &os.PathError{} + assert.Truef(t, errors.As(e, &pathErr), "expected path error, got %v", e) + assert.Truef(t, errors.Is(pathErr, fs.ErrExist), "error did not indicate file existence: %v", pathErr) +} + func newTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*tsm1.TSMReader) (tsm1.KeyIterator, error) { files := []string{} for _, r := range readers { @@ -2529,14 +2587,14 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { }, } - fs := &fakeFileStore{ + ffs := &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return testSet }, blockCount: 1000, } - cp := tsm1.NewDefaultPlanner(fs, time.Nanosecond) + cp := tsm1.NewDefaultPlanner(ffs, time.Nanosecond) plan, pLen := cp.Plan(time.Now().Add(-time.Second)) // first verify that our test set would return files if exp, got := 4, len(plan[0]); got != exp { @@ -2595,9 +2653,9 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { } cp.Release(plan) - cp.FileStore = fs + cp.FileStore = ffs // ensure that it will plan if last modified has changed - fs.lastModified = time.Now() + ffs.lastModified = time.Now() cGroups, pLen := cp.Plan(time.Now()) if exp, got := 4, len(cGroups[0]); got != exp { @@ -2693,7 +2751,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { }, } - fs := &fakeFileStore{ + ffs := &fakeFileStore{ PathsFn: func() []tsm1.FileStat { return testSet }, @@ -2701,7 +2759,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - fs, + ffs, time.Nanosecond, ) diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index 4784dc6d558..70f0fd77e22 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -66,6 +66,7 @@ import ( "bufio" "bytes" "encoding/binary" + "errors" "fmt" "hash/crc32" "io" @@ -511,19 +512,15 @@ func (d *directIndex) Size() uint32 { } func (d *directIndex) Close() error { + errs := make([]error, 0, 3) // Flush anything remaining in the index - if err := d.w.Flush(); err != nil { - return err - } - - if d.fd == nil { - return nil + errs = append(errs, d.w.Flush()) + if d.fd != nil { + // Close and remove the temporary index file + errs = append(errs, d.fd.Close()) + errs = append(errs, os.Remove(d.fd.Name())) } - - if err := d.fd.Close(); err != nil { - return err - } - return os.Remove(d.fd.Name()) + return errors.Join(errs...) } // Remove removes the index from any tempory storage @@ -532,11 +529,14 @@ func (d *directIndex) Remove() error { return nil } - // Close the file handle to prevent leaking. We ignore the error because - // we just want to cleanup and remove the file. - _ = d.fd.Close() - - return os.Remove(d.fd.Name()) + errs := make([]error, 0, 2) + // Close the file handle to prevent leaking. + // We don't let an error stop the removal. + if err := d.fd.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + errs = append(errs, err) + } + errs = append(errs, os.Remove(d.fd.Name())) + return errors.Join(errs...) } // tsmWriter writes keys and values in the TSM format @@ -756,25 +756,19 @@ func (t *tsmWriter) sync() error { } func (t *tsmWriter) Close() error { - if err := t.Flush(); err != nil { - return err - } - - if err := t.index.Close(); err != nil { - return err - } - + errs := make([]error, 0, 3) + errs = append(errs, t.Flush()) + errs = append(errs, t.index.Close()) if c, ok := t.wrapped.(io.Closer); ok { - return c.Close() + errs = append(errs, c.Close()) } - return nil + return errors.Join(errs...) } // Remove removes any temporary storage used by the writer. func (t *tsmWriter) Remove() error { - if err := t.index.Remove(); err != nil { - return err - } + errs := make([]error, 0, 3) + errs = append(errs, t.index.Remove()) // nameCloser is the most permissive interface we can close the wrapped // value with. @@ -783,14 +777,16 @@ func (t *tsmWriter) Remove() error { Name() string } + // If the writer is not a memory buffer, we can remove the file. if f, ok := t.wrapped.(nameCloser); ok { - // Close the file handle to prevent leaking. We ignore the error because - // we just want to cleanup and remove the file. - _ = f.Close() - - return os.Remove(f.Name()) + // Close the file handle to prevent leaking. + if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + errs = append(errs, err) + } + // Remove the file + errs = append(errs, os.Remove(f.Name())) } - return nil + return errors.Join(errs...) } func (t *tsmWriter) Size() uint32 {