Skip to content

Commit

Permalink
Tekton retry on ImagePullBackOff (#345)
Browse files Browse the repository at this point in the history
* implement retry on image pull back of with wait timeout

Co-authored-by: Alexander Link <[email protected]>
Co-authored-by: Andreas Brehmer <[email protected]>
  • Loading branch information
3 people authored Dec 22, 2022
1 parent a527b7e commit 45a1b10
Show file tree
Hide file tree
Showing 16 changed files with 777 additions and 80 deletions.
14 changes: 14 additions & 0 deletions changelog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@
- version: NEXT
date: TBD
changes:
- type: internal
impact: minor
title: Retry on ImagePullBackOff
description: |-
Since Tekton v0.41.0 TaskRuns fail if the corresponding pod is subject to
image pull back-off ([#4921](https://github.com/tektoncd/pipeline/pull/4921).
In case of transient image pull failures this can fail Steward PipelineRun
processing (`error_infra`).
Steward now detects aborted TaskRuns due to ImagePullBackOff and retries with
a new TaskRun for a configurable period (`waitTimeout`).
This happens in the 'waiting' phase of Steward PipelineRun processing.
pullRequestNumber: 345
jiraIssueNumber: 1974

- version: "0.23.1"
date: 2022-12-09
Expand Down
4 changes: 2 additions & 2 deletions charts/steward/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ type: application

# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 0.23.2-dev
version: 0.24.0-dev

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 0.23.2-dev
appVersion: 0.24.0-dev
5 changes: 5 additions & 0 deletions charts/steward/templates/config-pipelineruns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ data:
# or "2h45m". Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
timeout: 2h15m
# waitTimeout is the maximum time a pipeline is retrying to start a tekton task run
# which run into an image pull back off error before it gives up.
waitTimeout: 10m
limitRange: |
apiVersion: v1
kind: LimitRange
Expand Down Expand Up @@ -61,6 +65,7 @@ data:
jenkinsfileRunner.podSecurityContext.fsGroup: "1000"
timeout: {{ .Values.pipelineRuns.timeout | quote }}
waitTimeout: {{ .Values.pipelineRuns.waitTimeout | quote }}
limitRange: {{ default ( .Files.Get "data/pipelineruns-default-limitrange.yaml" ) .Values.pipelineRuns.limitRange | quote }}
resourceQuota: {{ .Values.pipelineRuns.resourceQuota | quote }}

Expand Down
1 change: 1 addition & 0 deletions charts/steward/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pipelineRuns:
pipelineCloneRetryIntervalSec: ""
pipelineCloneRetryTimeoutSec: ""
timeout: "60m"
waitTimeout: "10m"
defaultNetworkPolicyName: ""
networkPolicies: {}
limitRange: ""
Expand Down
11 changes: 11 additions & 0 deletions pkg/k8s/secrets/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ func RenameByAnnotationTransformer(key string) SecretTransformer {
}
}

// RenameTransformer returns a secret transformer function that sets
// `metadata.name` to the provided value. If an empty string is provided
// `metadata.name` is kept unchanged.
func RenameTransformer(newName string) SecretTransformer {
return func(secret *v1.Secret) {
if newName != "" {
secret.SetName(newName)
}
}
}

// SetAnnotationTransformer returns a secret transformer function that sets the
// annotation with the given key to the given value.
func SetAnnotationTransformer(key string, value string) SecretTransformer {
Expand Down
35 changes: 35 additions & 0 deletions pkg/k8s/secrets/transformers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,41 @@ func Test_RenameByAnnotationTransformer(t *testing.T) {
}
}

func Test_RenameTransformer(t *testing.T) {
t.Parallel()
const originalName string = "orig1"
for _, tc := range []struct {
name string
newName string
expectedName string
}{
{
name: "empty_name",
newName: "",
expectedName: originalName,
}, {
name: "working_rename",
newName: "newName1",
expectedName: "newName1",
},
} {
t.Run(tc.name, func(t *testing.T) {
// SETUP

orig := fake.SecretOpaque(originalName, "secret1")
transformed := orig.DeepCopy()

// EXERCISE
RenameTransformer(tc.newName)(transformed)

// VERIFY
expected := orig.DeepCopy()
expected.SetName(tc.expectedName)
assert.DeepEqual(t, expected, transformed)
})
}
}

func Test_SetAnnotationTransformer_SetNew(t *testing.T) {
t.Parallel()

Expand Down
15 changes: 15 additions & 0 deletions pkg/runctl/cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
mainConfigMapName = "steward-pipelineruns"
mainConfigKeyTimeout = "timeout"
mainConfigKeyTimeoutWait = "waitTimeout"
mainConfigKeyLimitRange = "limitRange"
mainConfigKeyResourceQuota = "resourceQuota"
mainConfigKeyImage = "jenkinsfileRunner.image"
Expand All @@ -37,6 +38,11 @@ type PipelineRunsConfigStruct struct {
// If `nil`, a default timeout should be used.
Timeout *metav1.Duration

// TimeoutWait is the maximum time a pipeline run can stay in state waiting
// before it is stopped with timeout error.
// If `nil`, the timeout is set to 10 minutes.
TimeoutWait *metav1.Duration

// The manifest (in YAML format) of a Kubernetes LimitRange object to be
// applied to each pipeline run sandbox namespace.
// If empty, no limit range will be defined.
Expand Down Expand Up @@ -206,6 +212,11 @@ func processMainConfig(configData map[string]string, dest *PipelineRunsConfigStr
return err
}

if dest.TimeoutWait, err =
parseDuration(mainConfigKeyTimeoutWait); err != nil {
return err
}

if dest.JenkinsfileRunnerPodSecurityContextRunAsUser, err =
parseInt64(mainConfigKeyPSCRunAsUser); err != nil {
return err
Expand All @@ -223,6 +234,10 @@ func processMainConfig(configData map[string]string, dest *PipelineRunsConfigStr
return nil
}

func metav1Duration(d time.Duration) *metav1.Duration {
return &metav1.Duration{Duration: d}
}

func processNetworkPoliciesConfig(configData map[string]string, dest *PipelineRunsConfigStruct) error {

isValidKey := func(key string) bool {
Expand Down
12 changes: 8 additions & 4 deletions pkg/runctl/cfg/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func Test_loadPipelineRunsConfig_CompleteConfig(t *testing.T) {
mainConfigKeyPSCRunAsGroup: "2222",
mainConfigKeyPSCFSGroup: "3333",
mainConfigKeyTimeout: "4444m",
mainConfigKeyTimeoutWait: "555m",
mainConfigKeyImage: "jfrImage1",
mainConfigKeyImagePullPolicy: "jfrImagePullPolicy1",
"someKeyThatShouldBeIgnored": "34957349",
Expand All @@ -210,6 +211,7 @@ func Test_loadPipelineRunsConfig_CompleteConfig(t *testing.T) {
assert.NilError(t, resultErr)
expectedConfig := &PipelineRunsConfigStruct{
Timeout: metav1Duration(time.Minute * 4444),
TimeoutWait: metav1Duration(time.Minute * 555),
LimitRange: "limitRange1",
ResourceQuota: "resourceQuota1",
JenkinsfileRunnerImage: "jfrImage1",
Expand Down Expand Up @@ -273,6 +275,9 @@ func Test_loadPipelineRunsConfig_InvalidValues(t *testing.T) {

{mainConfigKeyTimeout, "a"},
{mainConfigKeyTimeout, "1a"},

{mainConfigKeyTimeoutWait, "a"},
{mainConfigKeyTimeoutWait, "1a"},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
tc := tc // capture current value before going parallel
Expand Down Expand Up @@ -311,6 +316,7 @@ func Test_processMainConfig(t *testing.T) {
"_example": "exampleString",

mainConfigKeyTimeout: "4444m",
mainConfigKeyTimeoutWait: "555m",
mainConfigKeyLimitRange: "limitRange1",
mainConfigKeyResourceQuota: "resourceQuota1",

Expand All @@ -324,6 +330,7 @@ func Test_processMainConfig(t *testing.T) {
},
&PipelineRunsConfigStruct{
Timeout: metav1Duration(time.Minute * 4444),
TimeoutWait: metav1Duration(time.Minute * 555),
LimitRange: "limitRange1",
ResourceQuota: "resourceQuota1",

Expand All @@ -338,6 +345,7 @@ func Test_processMainConfig(t *testing.T) {
"all_empty",
map[string]string{
mainConfigKeyTimeout: "",
mainConfigKeyTimeoutWait: "",
mainConfigKeyLimitRange: "",
mainConfigKeyResourceQuota: "",

Expand Down Expand Up @@ -540,8 +548,4 @@ func newNetworkPolicyConfigMap(data map[string]string) *corev1.ConfigMap {
}
}

func metav1Duration(d time.Duration) *metav1.Duration {
return &metav1.Duration{Duration: d}
}

func int64Ptr(val int64) *int64 { return &val }
89 changes: 80 additions & 9 deletions pkg/runctl/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
var (
// Interval for histogram creation set to prometheus default scrape interval
meteringInterval = 1 * time.Minute

defaultWaitTimeout = 10 * time.Minute
)

// Controller processes PipelineRun resources
Expand Down Expand Up @@ -384,24 +386,31 @@ func (c *Controller) syncHandler(key string) error {

runManager := c.createRunManager(pipelineRun)

// Process pipeline run based on current state
switch state := pipelineRun.GetStatus().State; state {
case api.StatePreparing:
var pipelineRunsConfig *cfg.PipelineRunsConfigStruct
state := pipelineRun.GetStatus().State
if state == api.StatePreparing || state == api.StateWaiting {
// the configuration should be loaded once per sync to avoid inconsistencies
// in case of concurrent configuration changes
pipelineRunsConfig, err := c.loadPipelineRunsConfig(ctx)
pipelineRunsConfig, err = c.loadPipelineRunsConfig(ctx)
if err != nil {
return c.onGetRunError(ctx, pipelineRunAPIObj, pipelineRun, err, api.StateFinished, api.ResultErrorInfra, "failed to load configuration for pipeline runs")
if state == api.StatePreparing {
return c.onGetRunError(ctx, pipelineRunAPIObj, pipelineRun, err, api.StateFinished, api.ResultErrorInfra, "failed to load configuration for pipeline runs")
}
return c.onGetRunError(ctx, pipelineRunAPIObj, pipelineRun, err, api.StateCleaning, api.ResultErrorInfra, "failed to load configuration for pipeline runs")
}
namespace, auxNamespace, err := runManager.Start(ctx, pipelineRun, pipelineRunsConfig)
}

// Process pipeline run based on current state
switch state {
case api.StatePreparing:

namespace, auxNamespace, err := runManager.Prepare(ctx, pipelineRun, pipelineRunsConfig)
if err != nil {
c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonPreparingFailed, err.Error())
resultClass := serrors.GetClass(err)
// In case we have a result we can cleanup. Otherwise we retry in the next iteration.
if resultClass != api.ResultUndefined {
pipelineRun.UpdateMessage(err.Error())
pipelineRun.StoreErrorAsMessage(err, "preparing failed")
return c.updateStateAndResult(ctx, pipelineRun, api.StateCleaning, resultClass, metav1.Now())
return c.handleResultError(ctx, pipelineRun, resultClass, "preparing failed", err)
}
return err
}
Expand All @@ -412,11 +421,46 @@ func (c *Controller) syncHandler(key string) error {
if err = c.changeAndCommitStateAndMeter(ctx, pipelineRun, api.StateWaiting, metav1.Now()); err != nil {
return err
}

case api.StateWaiting:
run, err := runManager.GetRun(ctx, pipelineRun)
if err != nil {
return c.onGetRunError(ctx, pipelineRunAPIObj, pipelineRun, err, api.StateCleaning, api.ResultErrorInfra, "waiting failed")
}

// Check for wait timeout
startTime := pipelineRun.GetStatus().StateDetails.StartedAt
timeout := c.getWaitTimeout(pipelineRunsConfig)
if startTime.Add(timeout.Duration).Before(time.Now()) {
err := fmt.Errorf(
"main pod has not started after %s",
timeout,
)
return c.handleResultError(ctx, pipelineRun, api.ResultErrorInfra, "waiting failed", err)
}

if run == nil {
if err = runManager.Start(ctx, pipelineRun, pipelineRunsConfig); err != nil {
c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonWaitingFailed, err.Error())
resultClass := serrors.GetClass(err)
// In case we have a result we can cleanup. Otherwise we retry in the next iteration.
if resultClass != api.ResultUndefined {
return c.handleResultError(ctx, pipelineRun, resultClass, "waiting failed", err)
}
return err
}
return nil
} else if run.IsRestartable() {
c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonWaitingFailed, "restarting")
if err = runManager.DeleteRun(ctx, pipelineRun); err != nil {
if serrors.IsRecoverable(err) {
return err
}
return c.handleResultError(ctx, pipelineRun, api.ResultErrorInfra, "run deletion for restart failed", err)
}
return nil
}

started := run.GetStartTime()
if started != nil {
if err := c.changeAndCommitStateAndMeter(ctx, pipelineRun, api.StateRunning, *started); err != nil {
Expand All @@ -428,6 +472,11 @@ func (c *Controller) syncHandler(key string) error {
if err != nil {
return c.onGetRunError(ctx, pipelineRunAPIObj, pipelineRun, err, api.StateCleaning, api.ResultErrorInfra, "running failed")
}
if run == nil {
err = fmt.Errorf("task run not found in namespace %q", pipelineRun.GetRunNamespace())
return c.onGetRunError(ctx, pipelineRunAPIObj, pipelineRun, err, api.StateCleaning, api.ResultErrorInfra, "running failed")
}

containerInfo := run.GetContainerInfo()
pipelineRun.UpdateContainer(containerInfo)
if finished, result := run.IsFinished(); finished {
Expand Down Expand Up @@ -455,6 +504,28 @@ func (c *Controller) syncHandler(key string) error {
return nil
}

func (c *Controller) getWaitTimeout(pipelineRunsConfig *cfg.PipelineRunsConfigStruct) *metav1.Duration {
timeout := pipelineRunsConfig.TimeoutWait
if isZeroDuration(timeout) {
timeout = metav1Duration(time.Duration(defaultWaitTimeout))
}
return timeout
}

func metav1Duration(d time.Duration) *metav1.Duration {
return &metav1.Duration{Duration: d}
}

func isZeroDuration(d *metav1.Duration) bool {
return d == nil || d.Truncate(time.Second) == 0
}

func (c *Controller) handleResultError(ctx context.Context, pipelineRun k8s.PipelineRun, result api.Result, message string, err error) error {
pipelineRun.UpdateMessage(err.Error())
pipelineRun.StoreErrorAsMessage(err, message)
return c.updateStateAndResult(ctx, pipelineRun, api.StateCleaning, result, metav1.Now())
}

func (c *Controller) onGetRunError(ctx context.Context, pipelineRunAPIObj *api.PipelineRun, pipelineRun k8s.PipelineRun, err error, state api.State, result api.Result, message string) error {
c.recorder.Event(pipelineRunAPIObj, corev1.EventTypeWarning, api.EventReasonRunningFailed, err.Error())
if serrors.IsRecoverable(err) {
Expand Down
Loading

0 comments on commit 45a1b10

Please sign in to comment.