Skip to content

Commit

Permalink
Merge branch 'master-1.x' into db/4201/compaction-bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
devanbenz authored Jan 6, 2025
2 parents eb0a77d + e974165 commit 1bac192
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 185 deletions.
3 changes: 3 additions & 0 deletions coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
mapping.Dropped = append(mapping.Dropped, p)
atomic.AddInt64(&w.stats.WriteDropped, 1)
continue
} else if len(sg.Shards) <= 0 {
// Shard groups should have at least one shard.
return nil, fmt.Errorf("shard group %d covering %s to %s has no shards", sg.ID, sg.StartTime, sg.EndTime)
}

sh := sg.ShardFor(p)
Expand Down
98 changes: 51 additions & 47 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,30 +411,31 @@ func (h *Handler) Close() {

// Statistics maintains statistics for the httpd service.
type Statistics struct {
Requests int64
CQRequests int64
QueryRequests int64
WriteRequests int64
PingRequests int64
StatusRequests int64
WriteRequestBytesReceived int64
QueryRequestBytesTransmitted int64
PointsWrittenOK int64
PointsWrittenDropped int64
PointsWrittenFail int64
AuthenticationFailures int64
RequestDuration int64
QueryRequestDuration int64
WriteRequestDuration int64
ActiveRequests int64
ActiveWriteRequests int64
ClientErrors int64
ServerErrors int64
RecoveredPanics int64
PromWriteRequests int64
PromReadRequests int64
FluxQueryRequests int64
FluxQueryRequestDuration int64
Requests int64
CQRequests int64
QueryRequests int64
WriteRequests int64
PingRequests int64
StatusRequests int64
WriteRequestBytesReceived int64
QueryRequestBytesTransmitted int64
PointsWrittenOK int64
PointsWrittenDropped int64
PointsWrittenFail int64
AuthenticationFailures int64
RequestDuration int64
QueryRequestDuration int64
WriteRequestDuration int64
ActiveRequests int64
ActiveWriteRequests int64
ClientErrors int64
ServerErrors int64
RecoveredPanics int64
PromWriteRequests int64
PromReadRequests int64
FluxQueryRequests int64
FluxQueryRequestDuration int64
FluxQueryRequestBytesTransmitted int64
}

// Statistics returns statistics for periodic monitoring.
Expand All @@ -443,29 +444,30 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
Name: "httpd",
Tags: tags,
Values: map[string]interface{}{
statRequest: atomic.LoadInt64(&h.stats.Requests),
statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests),
statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests),
statPingRequest: atomic.LoadInt64(&h.stats.PingRequests),
statStatusRequest: atomic.LoadInt64(&h.stats.StatusRequests),
statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived),
statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted),
statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK),
statPointsWrittenDropped: atomic.LoadInt64(&h.stats.PointsWrittenDropped),
statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail),
statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures),
statRequestDuration: atomic.LoadInt64(&h.stats.RequestDuration),
statQueryRequestDuration: atomic.LoadInt64(&h.stats.QueryRequestDuration),
statWriteRequestDuration: atomic.LoadInt64(&h.stats.WriteRequestDuration),
statRequestsActive: atomic.LoadInt64(&h.stats.ActiveRequests),
statWriteRequestsActive: atomic.LoadInt64(&h.stats.ActiveWriteRequests),
statClientError: atomic.LoadInt64(&h.stats.ClientErrors),
statServerError: atomic.LoadInt64(&h.stats.ServerErrors),
statRecoveredPanics: atomic.LoadInt64(&h.stats.RecoveredPanics),
statPromWriteRequest: atomic.LoadInt64(&h.stats.PromWriteRequests),
statPromReadRequest: atomic.LoadInt64(&h.stats.PromReadRequests),
statFluxQueryRequests: atomic.LoadInt64(&h.stats.FluxQueryRequests),
statFluxQueryRequestDuration: atomic.LoadInt64(&h.stats.FluxQueryRequestDuration),
statRequest: atomic.LoadInt64(&h.stats.Requests),
statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests),
statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests),
statPingRequest: atomic.LoadInt64(&h.stats.PingRequests),
statStatusRequest: atomic.LoadInt64(&h.stats.StatusRequests),
statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived),
statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted),
statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK),
statPointsWrittenDropped: atomic.LoadInt64(&h.stats.PointsWrittenDropped),
statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail),
statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures),
statRequestDuration: atomic.LoadInt64(&h.stats.RequestDuration),
statQueryRequestDuration: atomic.LoadInt64(&h.stats.QueryRequestDuration),
statWriteRequestDuration: atomic.LoadInt64(&h.stats.WriteRequestDuration),
statRequestsActive: atomic.LoadInt64(&h.stats.ActiveRequests),
statWriteRequestsActive: atomic.LoadInt64(&h.stats.ActiveWriteRequests),
statClientError: atomic.LoadInt64(&h.stats.ClientErrors),
statServerError: atomic.LoadInt64(&h.stats.ServerErrors),
statRecoveredPanics: atomic.LoadInt64(&h.stats.RecoveredPanics),
statPromWriteRequest: atomic.LoadInt64(&h.stats.PromWriteRequests),
statPromReadRequest: atomic.LoadInt64(&h.stats.PromReadRequests),
statFluxQueryRequests: atomic.LoadInt64(&h.stats.FluxQueryRequests),
statFluxQueryRequestDuration: atomic.LoadInt64(&h.stats.FluxQueryRequestDuration),
statFluxQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.FluxQueryRequestBytesTransmitted),
},
}}
}
Expand Down Expand Up @@ -2161,6 +2163,8 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
if n == 0 {
// If the encoder did not write anything, we can write an error header.
h.httpError(w, err.Error(), http.StatusInternalServerError)
} else {
atomic.AddInt64(&h.stats.FluxQueryRequestBytesTransmitted, int64(n))
}
}
}
Expand Down
49 changes: 25 additions & 24 deletions services/httpd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,31 @@ import (

// statistics gathered by the httpd package.
const (
statRequest = "req" // Number of HTTP requests served.
statQueryRequest = "queryReq" // Number of query requests served.
statWriteRequest = "writeReq" // Number of write requests serverd.
statPingRequest = "pingReq" // Number of ping requests served.
statStatusRequest = "statusReq" // Number of status requests served.
statWriteRequestBytesReceived = "writeReqBytes" // Sum of all bytes in write requests.
statQueryRequestBytesTransmitted = "queryRespBytes" // Sum of all bytes returned in query reponses.
statPointsWrittenOK = "pointsWrittenOK" // Number of points written OK.
statValuesWrittenOK = "valuesWrittenOK" // Number of values (fields) written OK.
statPointsWrittenDropped = "pointsWrittenDropped" // Number of points dropped by the storage engine.
statPointsWrittenFail = "pointsWrittenFail" // Number of points that failed to be written.
statAuthFail = "authFail" // Number of authentication failures.
statRequestDuration = "reqDurationNs" // Number of (wall-time) nanoseconds spent inside requests.
statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests.
statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests.
statRequestsActive = "reqActive" // Number of currently active requests.
statWriteRequestsActive = "writeReqActive" // Number of currently active write requests.
statClientError = "clientError" // Number of HTTP responses due to client error.
statServerError = "serverError" // Number of HTTP responses due to server error.
statRecoveredPanics = "recoveredPanics" // Number of panics recovered by HTTP handler.
statPromWriteRequest = "promWriteReq" // Number of write requests to the prometheus endpoint.
statPromReadRequest = "promReadReq" // Number of read requests to the prometheus endpoint.
statFluxQueryRequests = "fluxQueryReq" // Number of flux query requests served.
statFluxQueryRequestDuration = "fluxQueryReqDurationNs" // Number of (wall-time) nanoseconds spent executing Flux query requests.
statRequest = "req" // Number of HTTP requests served.
statQueryRequest = "queryReq" // Number of query requests served.
statWriteRequest = "writeReq" // Number of write requests serverd.
statPingRequest = "pingReq" // Number of ping requests served.
statStatusRequest = "statusReq" // Number of status requests served.
statWriteRequestBytesReceived = "writeReqBytes" // Sum of all bytes in write requests.
statQueryRequestBytesTransmitted = "queryRespBytes" // Sum of all bytes returned in query reponses.
statPointsWrittenOK = "pointsWrittenOK" // Number of points written OK.
statValuesWrittenOK = "valuesWrittenOK" // Number of values (fields) written OK.
statPointsWrittenDropped = "pointsWrittenDropped" // Number of points dropped by the storage engine.
statPointsWrittenFail = "pointsWrittenFail" // Number of points that failed to be written.
statAuthFail = "authFail" // Number of authentication failures.
statRequestDuration = "reqDurationNs" // Number of (wall-time) nanoseconds spent inside requests.
statQueryRequestDuration = "queryReqDurationNs" // Number of (wall-time) nanoseconds spent inside query requests.
statWriteRequestDuration = "writeReqDurationNs" // Number of (wall-time) nanoseconds spent inside write requests.
statRequestsActive = "reqActive" // Number of currently active requests.
statWriteRequestsActive = "writeReqActive" // Number of currently active write requests.
statClientError = "clientError" // Number of HTTP responses due to client error.
statServerError = "serverError" // Number of HTTP responses due to server error.
statRecoveredPanics = "recoveredPanics" // Number of panics recovered by HTTP handler.
statPromWriteRequest = "promWriteReq" // Number of write requests to the prometheus endpoint.
statPromReadRequest = "promReadReq" // Number of read requests to the prometheus endpoint.
statFluxQueryRequests = "fluxQueryReq" // Number of flux query requests served.
statFluxQueryRequestDuration = "fluxQueryReqDurationNs" // Number of (wall-time) nanoseconds spent executing Flux query requests.
statFluxQueryRequestBytesTransmitted = "fluxQueryRespBytes" // Sum of all bytes returned in Flux query reponses.

)

Expand Down
93 changes: 51 additions & 42 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -64,6 +65,10 @@ func (e errCompactionInProgress) Error() string {
return "compaction in progress"
}

func (e errCompactionInProgress) Unwrap() error {
return e.err
}

type errCompactionAborted struct {
err error
}
Expand Down Expand Up @@ -1075,6 +1080,7 @@ func (c *Compactor) removeTmpFiles(files []string) error {
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, logger *zap.Logger) ([]string, error) {
// These are the new TSM files written
var files []string
var eInProgress errCompactionInProgress

for {
sequence++
Expand All @@ -1084,25 +1090,30 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
logger.Debug("Compacting files", zap.Int("file_count", len(src)), zap.String("output_file", fileName))

// Write as much as possible to this file
err := c.write(fileName, iter, throttle, logger)
rollToNext, err := c.write(fileName, iter, throttle, logger)

// We've hit the max file limit and there is more to write. Create a new file
// and continue.
if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
if rollToNext {
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
files = append(files, fileName)
logger.Debug("file size or block count exceeded, opening another output file", zap.String("output_file", fileName))
continue
} else if err == ErrNoValues {
} else if errors.Is(err, ErrNoValues) {
logger.Debug("Dropping empty file", zap.String("output_file", fileName))
// If the file only contained tombstoned entries, then it would be a 0 length
// file that we can drop.
if err := os.RemoveAll(fileName); err != nil {
return nil, err
}
break
} else if _, ok := err.(errCompactionInProgress); ok {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
} else if errors.As(err, &eInProgress) {
if !errors.Is(eInProgress.err, fs.ErrExist) {
logger.Error("error creating compaction file", zap.String("output_file", fileName), zap.Error(err))
} else {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
logger.Warn("file exists, compaction in progress already", zap.String("output_file", fileName))
}
return nil, err
} else if err != nil {
// We hit an error and didn't finish the compaction. Abort.
Expand All @@ -1124,10 +1135,10 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K
return files, nil
}

func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (err error) {
func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *zap.Logger) (rollToNext bool, err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
return false, errCompactionInProgress{err: err}
}

// syncingWriter ensures that whatever we wrap the above file descriptor in
Expand All @@ -1152,33 +1163,31 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
// in memory.
if iter.EstimatedIndexSize() > 64*1024*1024 {
w, err = NewTSMWriterWithDiskBuffer(limitWriter)
if err != nil {
return err
}
} else {
w, err = NewTSMWriter(limitWriter)
if err != nil {
return err
}
}

if err != nil {
// Close the file and return if we can't create the TSMWriter
return false, errors.Join(err, fd.Close())
}
defer func() {
var eInProgress errCompactionInProgress

errs := make([]error, 0, 3)
errs = append(errs, err)
closeErr := w.Close()
if err == nil {
err = closeErr
}
errs = append(errs, closeErr)

// Check for errors where we should not remove the file
_, inProgress := err.(errCompactionInProgress)
maxBlocks := err == ErrMaxBlocksExceeded
maxFileSize := err == errMaxFileExceeded
if inProgress || maxBlocks || maxFileSize {
// Check for conditions where we should not remove the file
inProgress := errors.As(err, &eInProgress) && errors.Is(eInProgress.err, fs.ErrExist)
if (closeErr == nil) && (inProgress || rollToNext) {
// do not join errors, there is only the one.
return
} else if err != nil || closeErr != nil {
// Remove the file, we have had a problem
errs = append(errs, w.Remove())
}

if err != nil {
_ = w.Remove()
}
err = errors.Join(errs...)
}()

lastLogSize := w.Size()
Expand All @@ -1188,38 +1197,38 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *
c.mu.RUnlock()

if !enabled {
return errCompactionAborted{}
return false, errCompactionAborted{}
}
// Each call to read returns the next sorted key (or the prior one if there are
// more values to write). The size of values will be less than or equal to our
// chunk size (1000)
key, minTime, maxTime, block, err := iter.Read()
if err != nil {
return err
return false, err
}

if minTime > maxTime {
return fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime)
return false, fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime)
}

// Write the key and value
if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded {
if err := w.WriteBlock(key, minTime, maxTime, block); errors.Is(err, ErrMaxBlocksExceeded) {
if err := w.WriteIndex(); err != nil {
return err
return false, err
}
return err
return true, err
} else if err != nil {
return err
return false, err
}

// If we have a max file size configured and we're over it, close out the file
// If we're over maxTSMFileSize, close out the file
// and return the error.
if w.Size() > tsdb.MaxTSMFileSize {
if err := w.WriteIndex(); err != nil {
return err
return false, err
}

return errMaxFileExceeded
return true, errMaxFileExceeded
} else if (w.Size() - lastLogSize) > logEvery {
logger.Debug("Compaction progress", zap.String("output_file", path), zap.Uint32("size", w.Size()))
lastLogSize = w.Size()
Expand All @@ -1228,15 +1237,15 @@ func (c *Compactor) write(path string, iter KeyIterator, throttle bool, logger *

// Were there any errors encountered during iteration?
if err := iter.Err(); err != nil {
return err
return false, err
}

// We're all done. Close out the file.
if err := w.WriteIndex(); err != nil {
return err
return false, err
}
logger.Debug("Compaction finished", zap.String("output_file", path), zap.Uint32("size", w.Size()))
return nil
return false, nil
}

func (c *Compactor) add(files []string) bool {
Expand Down
Loading

0 comments on commit 1bac192

Please sign in to comment.