Skip to content

Commit

Permalink
Proper retries (#1684)
Browse files Browse the repository at this point in the history
* Proper retries

* wip
  • Loading branch information
piotrmiskiewicz authored Jan 27, 2025
1 parent 523fdb9 commit 4e4d012
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
53 changes: 50 additions & 3 deletions internal/process/operation_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package process
import (
"fmt"
"log/slog"
"sync"
"time"

"github.com/kyma-project/kyma-environment-broker/common/orchestration"
Expand All @@ -17,10 +18,38 @@ type OperationManager struct {
storage storage.Operations
component kebErr.Component
step string

// stores timestamp to calculate timeout in retry* methods, the key is the operation.ID
retryTimestamps map[string]time.Time
mu sync.RWMutex
}

func NewOperationManager(storage storage.Operations, step string, component kebErr.Component) *OperationManager {
return &OperationManager{storage: storage, component: component, step: step}
op := &OperationManager{storage: storage, component: component, step: step, retryTimestamps: make(map[string]time.Time)}
go func() {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
<-ticker.C
runTimestampGC(op, step)
}
}()
return op
}

func runTimestampGC(op *OperationManager, step string) {
numberOfDeletions := 0
op.mu.Lock()
for opId, ts := range op.retryTimestamps {
if time.Since(ts) > 48*time.Hour {
delete(op.retryTimestamps, opId)
numberOfDeletions++
}
}
op.mu.Unlock()
if numberOfDeletions > 0 {
slog.Info("Operation Manager for step %s has deleted %d old timestamps", step, numberOfDeletions)
}
}

// OperationSucceeded marks the operation as succeeded and returns status of the operation's update
Expand Down Expand Up @@ -69,9 +98,12 @@ func (om *OperationManager) OperationCanceled(operation internal.Operation, desc
func (om *OperationManager) RetryOperation(operation internal.Operation, errorMessage string, err error, retryInterval time.Duration, maxTime time.Duration, log *slog.Logger) (internal.Operation, time.Duration, error) {
log.Info(fmt.Sprintf("Retry Operation was triggered with message: %s", errorMessage))
log.Info(fmt.Sprintf("Retrying for %s in %s steps", maxTime.String(), retryInterval.String()))
if time.Since(operation.UpdatedAt) < maxTime {

om.storeTimestampIfMissing(operation.ID)
if !om.isTimeoutOccurred(operation.ID, maxTime) {
return operation, retryInterval, nil
}

log.Error(fmt.Sprintf("Aborting after %s of failing retries", maxTime.String()))
op, retry, err := om.OperationFailed(operation, errorMessage, err, log)
if err == nil {
Expand All @@ -90,7 +122,8 @@ func (om *OperationManager) RetryOperationWithoutFail(operation internal.Operati
}

log.Info(fmt.Sprintf("retrying for %s in %s steps", maxTime.String(), retryInterval.String()))
if time.Since(operation.UpdatedAt) < maxTime {
om.storeTimestampIfMissing(operation.ID)
if !om.isTimeoutOccurred(operation.ID, maxTime) {
return operation, retryInterval, nil
}
// update description to track failed steps
Expand Down Expand Up @@ -166,3 +199,17 @@ func (om *OperationManager) update(operation internal.Operation, state domain.La
operation.Description = description
}, log)
}

func (om *OperationManager) storeTimestampIfMissing(id string) {
om.mu.Lock()
defer om.mu.Unlock()
if om.retryTimestamps[id].IsZero() {
om.retryTimestamps[id] = time.Now()
}
}

func (om *OperationManager) isTimeoutOccurred(id string, maxTime time.Duration) bool {
om.mu.RLock()
defer om.mu.RUnlock()
return !om.retryTimestamps[id].IsZero() && time.Since(om.retryTimestamps[id]) > maxTime
}
6 changes: 2 additions & 4 deletions internal/process/operation_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Test_OperationManager_RetryOperationOnce(t *testing.T) {
opManager := NewOperationManager(operations, "some_step", kebErr.ProvisionerDependency)
op := internal.Operation{}
op.UpdatedAt = time.Now()
retryInterval := time.Hour
retryInterval := time.Millisecond
errMsg := fmt.Errorf("ups ... ")

// this is required to avoid storage retries (without this statement there will be an error => retry)
Expand All @@ -37,9 +37,7 @@ func Test_OperationManager_RetryOperationOnce(t *testing.T) {
assert.Nil(t, err)

// then - second call
t.Log(op.UpdatedAt.String())
op.UpdatedAt = op.UpdatedAt.Add(-retryInterval - time.Second) // simulate wait of first retry
t.Log(op.UpdatedAt.String())
time.Sleep(time.Millisecond * 2)
op, when, err = opManager.RetryOperationOnce(op, errMsg.Error(), errMsg, retryInterval, fixLogger())

// when - second call => no retry
Expand Down

0 comments on commit 4e4d012

Please sign in to comment.