Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move checks for errors #157

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions pkg/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ package common

import (
"context"
"errors"
"reflect"
"runtime"
"strings"
"time"

"github.com/nuclio/errors"
nuclioerrors "github.com/nuclio/errors"
"github.com/nuclio/logger"
)

Expand Down Expand Up @@ -87,7 +88,7 @@ func RetryFunc(ctx context.Context,
time.Sleep(backoff.Duration())
} else {
if retryInterval == nil {
return errors.New("Either retry interval or backoff must be given")
return nuclioerrors.New("Either retry interval or backoff must be given")
}
time.Sleep(*retryInterval)
}
Expand All @@ -108,7 +109,7 @@ func RetryFunc(ctx context.Context,
"function", getFunctionName(fn),
"err", err,
"attempts", attempts)
return errors.New("Failed final attempt to invoke function without proper error supplied")
return nuclioerrors.New("Failed final attempt to invoke function without proper error supplied")
}
return err
}
Expand Down Expand Up @@ -192,17 +193,12 @@ func EngineErrorIsNonFatal(err error) bool {
return errorMatches(err, nonFatalEngineErrorsPartialMatch)
}

func EngineErrorIsFatal(err error) bool {
var fatalEngineErrorsPartialMatch = []string{
"lookup v3io-webapi: i/o timeout",
}
return errorMatches(err, fatalEngineErrorsPartialMatch)
}

func errorMatches(err error, substrings []string) bool {
if err != nil && len(err.Error()) > 0 {
// Unwraps the entire error chain
for e := err; e != nil; e = errors.Unwrap(e) {
errMsg := e.Error()
for _, substring := range substrings {
if strings.Contains(err.Error(), substring) || strings.Contains(errors.Cause(err).Error(), substring) {
if strings.Contains(errMsg, substring) {
return true
}
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/dataplane/streamconsumergroup/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,16 @@ func (c *claim) fetchRecordBatches(stopChannel chan struct{}, fetchInterval time
func(attempt int) (bool, error, int) {
c.currentShardLocation, err = c.getCurrentShardLocation(c.shardID)
if err != nil {
if common.EngineErrorIsNonFatal(err) {
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 0
}

// if the error is fatal and requires external resolution,
// if the error is not fatal (as network issue),
// we don't want to fail; instead, we will inform the user via a log
if common.EngineErrorIsFatal(err) {
c.logger.ErrorWith("A fatal error occurred. Will retry until successful",
if common.EngineErrorIsNonFatal(err) {
c.logger.ErrorWith("Failed to get shard location. Will retry until successful",
"error", err,
"shard", c.shardID)
// for this type of error, we always increment the attempt counter
// this ensures the smooth operation of other components in Nuclio
// we avoid panicking and simply wait for the issue to be resolved
return true, errors.Wrap(err, "Failed to get shard location"), 1
return true, errors.Wrap(err, "Failed to get shard location due to a network error"), 1
}

// requested for an immediate stop
Expand Down