From 4e4d0122b1f9e2f92f451b4bae8c32068dae79d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Mi=C5=9Bkiewicz?= Date: Mon, 27 Jan 2025 14:22:57 +0100 Subject: [PATCH] Proper retries (#1684) * Proper retries * wip --- internal/process/operation_manager.go | 53 ++++++++++++++++++++-- internal/process/operation_manager_test.go | 6 +-- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/internal/process/operation_manager.go b/internal/process/operation_manager.go index cf02367304..85bf473104 100644 --- a/internal/process/operation_manager.go +++ b/internal/process/operation_manager.go @@ -3,6 +3,7 @@ package process import ( "fmt" "log/slog" + "sync" "time" "github.com/kyma-project/kyma-environment-broker/common/orchestration" @@ -17,10 +18,38 @@ type OperationManager struct { storage storage.Operations component kebErr.Component step string + + // stores timestamp to calculate timeout in retry* methods, the key is the operation.ID + retryTimestamps map[string]time.Time + mu sync.RWMutex } func NewOperationManager(storage storage.Operations, step string, component kebErr.Component) *OperationManager { - return &OperationManager{storage: storage, component: component, step: step} + op := &OperationManager{storage: storage, component: component, step: step, retryTimestamps: make(map[string]time.Time)} + go func() { + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + for { + <-ticker.C + runTimestampGC(op, step) + } + }() + return op +} + +func runTimestampGC(op *OperationManager, step string) { + numberOfDeletions := 0 + op.mu.Lock() + for opId, ts := range op.retryTimestamps { + if time.Since(ts) > 48*time.Hour { + delete(op.retryTimestamps, opId) + numberOfDeletions++ + } + } + op.mu.Unlock() + if numberOfDeletions > 0 { + slog.Info("Operation Manager for step %s has deleted %d old timestamps", step, numberOfDeletions) + } } // OperationSucceeded marks the operation as succeeded and returns status of the operation's update @@ -69,9 +98,12 @@ func (om *OperationManager) OperationCanceled(operation internal.Operation, desc func (om *OperationManager) RetryOperation(operation internal.Operation, errorMessage string, err error, retryInterval time.Duration, maxTime time.Duration, log *slog.Logger) (internal.Operation, time.Duration, error) { log.Info(fmt.Sprintf("Retry Operation was triggered with message: %s", errorMessage)) log.Info(fmt.Sprintf("Retrying for %s in %s steps", maxTime.String(), retryInterval.String())) - if time.Since(operation.UpdatedAt) < maxTime { + + om.storeTimestampIfMissing(operation.ID) + if !om.isTimeoutOccurred(operation.ID, maxTime) { return operation, retryInterval, nil } + log.Error(fmt.Sprintf("Aborting after %s of failing retries", maxTime.String())) op, retry, err := om.OperationFailed(operation, errorMessage, err, log) if err == nil { @@ -90,7 +122,8 @@ func (om *OperationManager) RetryOperationWithoutFail(operation internal.Operati } log.Info(fmt.Sprintf("retrying for %s in %s steps", maxTime.String(), retryInterval.String())) - if time.Since(operation.UpdatedAt) < maxTime { + om.storeTimestampIfMissing(operation.ID) + if !om.isTimeoutOccurred(operation.ID, maxTime) { return operation, retryInterval, nil } // update description to track failed steps @@ -166,3 +199,17 @@ func (om *OperationManager) update(operation internal.Operation, state domain.La operation.Description = description }, log) } + +func (om *OperationManager) storeTimestampIfMissing(id string) { + om.mu.Lock() + defer om.mu.Unlock() + if om.retryTimestamps[id].IsZero() { + om.retryTimestamps[id] = time.Now() + } +} + +func (om *OperationManager) isTimeoutOccurred(id string, maxTime time.Duration) bool { + om.mu.RLock() + defer om.mu.RUnlock() + return !om.retryTimestamps[id].IsZero() && time.Since(om.retryTimestamps[id]) > maxTime +} diff --git a/internal/process/operation_manager_test.go b/internal/process/operation_manager_test.go index 89c19988d1..722b67f15f 100644 --- a/internal/process/operation_manager_test.go +++ b/internal/process/operation_manager_test.go @@ -22,7 +22,7 @@ func Test_OperationManager_RetryOperationOnce(t *testing.T) { opManager := NewOperationManager(operations, "some_step", kebErr.ProvisionerDependency) op := internal.Operation{} op.UpdatedAt = time.Now() - retryInterval := time.Hour + retryInterval := time.Millisecond errMsg := fmt.Errorf("ups ... ") // this is required to avoid storage retries (without this statement there will be an error => retry) @@ -37,9 +37,7 @@ func Test_OperationManager_RetryOperationOnce(t *testing.T) { assert.Nil(t, err) // then - second call - t.Log(op.UpdatedAt.String()) - op.UpdatedAt = op.UpdatedAt.Add(-retryInterval - time.Second) // simulate wait of first retry - t.Log(op.UpdatedAt.String()) + time.Sleep(time.Millisecond * 2) op, when, err = opManager.RetryOperationOnce(op, errMsg.Error(), errMsg, retryInterval, fixLogger()) // when - second call => no retry