Skip to content

Commit

Permalink
fix: Ensure pending condition is always preserved (#887)
Browse files Browse the repository at this point in the history
  • Loading branch information
shorim authored Mar 14, 2024
1 parent ccf8600 commit 6ef6e90
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 53 deletions.
32 changes: 15 additions & 17 deletions internal/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func MessageFor(reason string, messageMap map[string]string) string {
return ""
}

func SetPendingCondition(ctx context.Context, conditions *[]metav1.Condition, generation int64, reason, resourceName string, messageMap map[string]string) {
func HandlePendingCondition(ctx context.Context, conditions *[]metav1.Condition, generation int64, reason, resourceName string, messageMap map[string]string) {
log := logf.FromContext(ctx)

pending := New(
Expand All @@ -127,32 +127,30 @@ func SetPendingCondition(ctx context.Context, conditions *[]metav1.Condition, ge
meta.SetStatusCondition(conditions, pending)
}

func SetRunningCondition(ctx context.Context, conditions *[]metav1.Condition, generation int64, reason, resourceName string, messageMap map[string]string) {
func HandleRunningCondition(ctx context.Context, conditions *[]metav1.Condition, generation int64, runningReason, pendingReason, resourceName string, messageMap map[string]string) {
log := logf.FromContext(ctx)

existingPending := meta.FindStatusCondition(*conditions, TypePending)
if existingPending != nil {
newPending := New(
TypePending,
existingPending.Reason,
metav1.ConditionFalse,
generation,
messageMap,
)
newPending.Message = PendingTypeDeprecationMsg + newPending.Message
log.V(1).Info(fmt.Sprintf("Updating the status of %s: Setting the Pending condition to False", resourceName))
meta.SetStatusCondition(conditions, newPending)
}
// Set Pending condition to False
pending := New(
TypePending,
pendingReason,
metav1.ConditionFalse,
generation,
messageMap,
)
pending.Message = PendingTypeDeprecationMsg + pending.Message
log.V(1).Info(fmt.Sprintf("Updating the status of %s: Setting the Pending condition to False", resourceName))
meta.SetStatusCondition(conditions, pending)

// Set Running condition to True
running := New(
TypeRunning,
reason,
runningReason,
metav1.ConditionTrue,
generation,
messageMap,
)
running.Message = RunningTypeDeprecationMsg + running.Message

log.V(1).Info(fmt.Sprintf("Updating the status of %s: Setting the Running condition to True", resourceName))
meta.SetStatusCondition(conditions, running)
}
84 changes: 61 additions & 23 deletions internal/conditions/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,31 @@ func TestMessageFor(t *testing.T) {
})
}

func TestSetPendingCondition(t *testing.T) {
t.Run("should just add pending condition if the conditions list is empty", func(t *testing.T) {
var conditions []metav1.Condition
func TestHandlePendingCondition(t *testing.T) {
t.Run("should just set pending condition to true if running condition is not in the conditions list", func(t *testing.T) {
conditions := []metav1.Condition{
{
Type: TypeAgentHealthy,
Status: metav1.ConditionFalse,
Reason: ReasonDaemonSetNotReady,
Message: MessageFor(ReasonDaemonSetNotReady, LogsMessage),
LastTransitionTime: metav1.Now(),
},
{
Type: TypeConfigurationGenerated,
Status: metav1.ConditionTrue,
Reason: ReasonConfigurationGenerated,
Message: MessageFor(ReasonConfigurationGenerated, LogsMessage),
LastTransitionTime: metav1.Now(),
},
}
generation := int64(1)
reason := ReasonFluentBitDSNotReady

SetPendingCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)
HandlePendingCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)

pendingCond := meta.FindStatusCondition(conditions, TypePending)
conditionsSize := len(conditions)
pendingCond := conditions[conditionsSize-1]
require.Equal(t, TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionTrue, pendingCond.Status)
require.Equal(t, reason, pendingCond.Reason)
Expand All @@ -52,6 +68,20 @@ func TestSetPendingCondition(t *testing.T) {

t.Run("should remove running condition and set pending condition to true", func(t *testing.T) {
conditions := []metav1.Condition{
{
Type: TypeAgentHealthy,
Status: metav1.ConditionFalse,
Reason: ReasonDaemonSetNotReady,
Message: MessageFor(ReasonDaemonSetNotReady, LogsMessage),
LastTransitionTime: metav1.Now(),
},
{
Type: TypeConfigurationGenerated,
Status: metav1.ConditionTrue,
Reason: ReasonConfigurationGenerated,
Message: MessageFor(ReasonConfigurationGenerated, LogsMessage),
LastTransitionTime: metav1.Now(),
},
{
Type: TypePending,
Status: metav1.ConditionFalse,
Expand All @@ -70,13 +100,13 @@ func TestSetPendingCondition(t *testing.T) {
generation := int64(1)
reason := ReasonFluentBitDSNotReady

SetPendingCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)
HandlePendingCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)

runningCond := meta.FindStatusCondition(conditions, TypeRunning)
require.Nil(t, runningCond)

pendingCond := meta.FindStatusCondition(conditions, TypePending)
require.NotNil(t, pendingCond)
conditionsSize := len(conditions)
pendingCond := conditions[conditionsSize-1]
require.Equal(t, TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionTrue, pendingCond.Status)
require.Equal(t, reason, pendingCond.Reason)
Expand All @@ -87,38 +117,46 @@ func TestSetPendingCondition(t *testing.T) {
})
}

func TestSetRunningCondition(t *testing.T) {
t.Run("should set pending condition to false and add running condition", func(t *testing.T) {
func TestHandleRunningCondition(t *testing.T) {
t.Run("should set pending condition to false and set running condition to true", func(t *testing.T) {
conditions := []metav1.Condition{
{
Type: TypePending,
Type: TypeAgentHealthy,
Status: metav1.ConditionTrue,
Reason: ReasonFluentBitDSNotReady,
Message: PendingTypeDeprecationMsg + MessageFor(ReasonFluentBitDSNotReady, LogsMessage),
Reason: ReasonDaemonSetReady,
Message: MessageFor(ReasonDaemonSetReady, LogsMessage),
LastTransitionTime: metav1.Now(),
},
{
Type: TypeConfigurationGenerated,
Status: metav1.ConditionTrue,
Reason: ReasonConfigurationGenerated,
Message: MessageFor(ReasonConfigurationGenerated, LogsMessage),
LastTransitionTime: metav1.Now(),
},
}
generation := int64(1)
reason := ReasonFluentBitDSReady
runningReason := ReasonFluentBitDSReady
pendingReason := ReasonFluentBitDSNotReady

HandleRunningCondition(context.Background(), &conditions, generation, runningReason, pendingReason, "pipeline", LogsMessage)

SetRunningCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)
conditionsSize := len(conditions)

pendingCond := meta.FindStatusCondition(conditions, TypePending)
require.NotNil(t, pendingCond)
pendingCond := conditions[conditionsSize-2]
require.Equal(t, TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionFalse, pendingCond.Status)
require.Equal(t, ReasonFluentBitDSNotReady, pendingCond.Reason)
pendingCondMsg := PendingTypeDeprecationMsg + MessageFor(ReasonFluentBitDSNotReady, LogsMessage)
require.Equal(t, pendingReason, pendingCond.Reason)
pendingCondMsg := PendingTypeDeprecationMsg + MessageFor(pendingReason, LogsMessage)
require.Equal(t, pendingCondMsg, pendingCond.Message)
require.Equal(t, generation, pendingCond.ObservedGeneration)
require.NotEmpty(t, pendingCond.LastTransitionTime)

runningCond := meta.FindStatusCondition(conditions, TypeRunning)
require.NotNil(t, runningCond)
runningCond := conditions[conditionsSize-1]
require.Equal(t, TypeRunning, runningCond.Type)
require.Equal(t, metav1.ConditionTrue, runningCond.Status)
require.Equal(t, reason, runningCond.Reason)
runningCondMsg := RunningTypeDeprecationMsg + MessageFor(reason, LogsMessage)
require.Equal(t, runningReason, runningCond.Reason)
runningCondMsg := RunningTypeDeprecationMsg + MessageFor(runningReason, LogsMessage)
require.Equal(t, runningCondMsg, runningCond.Message)
require.Equal(t, generation, runningCond.ObservedGeneration)
require.NotEmpty(t, runningCond.LastTransitionTime)
Expand Down
12 changes: 10 additions & 2 deletions internal/reconciler/logparser/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,18 @@ func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, parser
}

if !fluentBitReady {
conditions.SetPendingCondition(ctx, &parser.Status.Conditions, parser.Generation, conditions.ReasonFluentBitDSNotReady, parser.Name, conditions.LogsMessage)
conditions.HandlePendingCondition(ctx, &parser.Status.Conditions, parser.Generation, conditions.ReasonFluentBitDSNotReady, parser.Name, conditions.LogsMessage)
return

}

conditions.SetRunningCondition(ctx, &parser.Status.Conditions, parser.Generation, conditions.ReasonFluentBitDSReady, parser.Name, conditions.LogsMessage)
conditions.HandleRunningCondition(
ctx,
&parser.Status.Conditions,
parser.Generation,
conditions.ReasonFluentBitDSReady,
conditions.ReasonFluentBitDSNotReady,
parser.Name,
conditions.LogsMessage,
)
}
10 changes: 10 additions & 0 deletions internal/reconciler/logparser/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func TestUpdateStatus(t *testing.T) {
require.NotEmpty(t, agentHealthyCond.LastTransitionTime)

conditionsSize := len(updatedParser.Status.Conditions)

pendingCond := updatedParser.Status.Conditions[conditionsSize-2]
require.Equal(t, conditions.TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionFalse, pendingCond.Status)
require.Equal(t, conditions.ReasonFluentBitDSNotReady, pendingCond.Reason)
pendingCondMsg := conditions.PendingTypeDeprecationMsg + conditions.MessageFor(conditions.ReasonFluentBitDSNotReady, conditions.LogsMessage)
require.Equal(t, pendingCondMsg, pendingCond.Message)
require.Equal(t, updatedParser.Generation, pendingCond.ObservedGeneration)
require.NotEmpty(t, pendingCond.LastTransitionTime)

runningCond := updatedParser.Status.Conditions[conditionsSize-1]
require.Equal(t, conditions.TypeRunning, runningCond.Type)
require.Equal(t, metav1.ConditionTrue, runningCond.Status)
Expand Down
16 changes: 12 additions & 4 deletions internal/reconciler/logpipeline/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ func (r *Reconciler) setFluentBitConfigGeneratedCondition(ctx context.Context, p
func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, pipeline *telemetryv1alpha1.LogPipeline) {

if pipeline.Spec.Output.IsLokiDefined() {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonUnsupportedLokiOutput, pipeline.Name, conditions.LogsMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonUnsupportedLokiOutput, pipeline.Name, conditions.LogsMessage)
return
}

referencesNonExistentSecret := secretref.ReferencesNonExistentSecret(ctx, r.Client, pipeline)
if referencesNonExistentSecret {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonReferencedSecretMissing, pipeline.Name, conditions.LogsMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonReferencedSecretMissing, pipeline.Name, conditions.LogsMessage)
return
}

Expand All @@ -119,9 +119,17 @@ func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, pipelin
}

if !fluentBitReady {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonFluentBitDSNotReady, pipeline.Name, conditions.LogsMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonFluentBitDSNotReady, pipeline.Name, conditions.LogsMessage)
return
}

conditions.SetRunningCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonFluentBitDSReady, pipeline.Name, conditions.LogsMessage)
conditions.HandleRunningCondition(
ctx,
&pipeline.Status.Conditions,
pipeline.Generation,
conditions.ReasonFluentBitDSReady,
conditions.ReasonFluentBitDSNotReady,
pipeline.Name,
conditions.LogsMessage,
)
}
10 changes: 10 additions & 0 deletions internal/reconciler/logpipeline/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func TestUpdateStatus(t *testing.T) {
require.NotEmpty(t, agentHealthyCond.LastTransitionTime)

conditionsSize := len(updatedPipeline.Status.Conditions)

pendingCond := updatedPipeline.Status.Conditions[conditionsSize-2]
require.Equal(t, conditions.TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionFalse, pendingCond.Status)
require.Equal(t, conditions.ReasonFluentBitDSNotReady, pendingCond.Reason)
pendingCondMsg := conditions.PendingTypeDeprecationMsg + conditions.MessageFor(conditions.ReasonFluentBitDSNotReady, conditions.LogsMessage)
require.Equal(t, pendingCondMsg, pendingCond.Message)
require.Equal(t, updatedPipeline.Generation, pendingCond.ObservedGeneration)
require.NotEmpty(t, pendingCond.LastTransitionTime)

runningCond := updatedPipeline.Status.Conditions[conditionsSize-1]
require.Equal(t, conditions.TypeRunning, runningCond.Type)
require.Equal(t, metav1.ConditionTrue, runningCond.Status)
Expand Down
18 changes: 13 additions & 5 deletions internal/reconciler/tracepipeline/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, pipelineName string, with
return nil
}

// If the "GatewayHealthy" type doesn't exist in the conditions,
// If the "GatewayHealthy" type doesn't exist in the conditions
// then we need to reset the conditions list to ensure that the "Pending" and "Running" conditions are appended to the end of the conditions list
// Check step 3 in https://github.com/kyma-project/telemetry-manager/blob/main/docs/contributor/arch/004-consolidate-pipeline-statuses.md#decision
if meta.FindStatusCondition(pipeline.Status.Conditions, conditions.TypeGatewayHealthy) == nil {
Expand Down Expand Up @@ -85,13 +85,13 @@ func (r *Reconciler) setGatewayConfigGeneratedCondition(ctx context.Context, pip

func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, pipeline *telemetryv1alpha1.TracePipeline, withinPipelineCountLimit bool) {
if !withinPipelineCountLimit {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonMaxPipelinesExceeded, pipeline.Name, conditions.TracesMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonMaxPipelinesExceeded, pipeline.Name, conditions.TracesMessage)
return
}

referencesNonExistentSecret := secretref.ReferencesNonExistentSecret(ctx, r.Client, pipeline)
if referencesNonExistentSecret {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonReferencedSecretMissing, pipeline.Name, conditions.TracesMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonReferencedSecretMissing, pipeline.Name, conditions.TracesMessage)
return
}

Expand All @@ -102,9 +102,17 @@ func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, pipelin
}

if !gatewayReady {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonTraceGatewayDeploymentNotReady, pipeline.Name, conditions.TracesMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonTraceGatewayDeploymentNotReady, pipeline.Name, conditions.TracesMessage)
return
}

conditions.SetRunningCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonTraceGatewayDeploymentReady, pipeline.Name, conditions.TracesMessage)
conditions.HandleRunningCondition(
ctx,
&pipeline.Status.Conditions,
pipeline.Generation,
conditions.ReasonTraceGatewayDeploymentReady,
conditions.ReasonTraceGatewayDeploymentNotReady,
pipeline.Name,
conditions.TracesMessage,
)
}
10 changes: 10 additions & 0 deletions internal/reconciler/tracepipeline/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ func TestUpdateStatus(t *testing.T) {
require.NotEmpty(t, gatewayHealthyCond.LastTransitionTime)

conditionsSize := len(updatedPipeline.Status.Conditions)

pendingCond := updatedPipeline.Status.Conditions[conditionsSize-2]
require.Equal(t, conditions.TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionFalse, pendingCond.Status)
require.Equal(t, conditions.ReasonTraceGatewayDeploymentNotReady, pendingCond.Reason)
pendingCondMsg := conditions.PendingTypeDeprecationMsg + conditions.MessageFor(conditions.ReasonTraceGatewayDeploymentNotReady, conditions.TracesMessage)
require.Equal(t, pendingCondMsg, pendingCond.Message)
require.Equal(t, updatedPipeline.Generation, pendingCond.ObservedGeneration)
require.NotEmpty(t, pendingCond.LastTransitionTime)

runningCond := updatedPipeline.Status.Conditions[conditionsSize-1]
require.Equal(t, conditions.TypeRunning, runningCond.Type)
require.Equal(t, metav1.ConditionTrue, runningCond.Status)
Expand Down
3 changes: 2 additions & 1 deletion test/testkit/verifiers/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func LogPipelineShouldBeHealthy(ctx context.Context, k8sClient client.Client, pi
var pipeline telemetryv1alpha1.LogPipeline
key := types.NamespacedName{Name: pipelineName}
g.Expect(k8sClient.Get(ctx, key, &pipeline)).To(Succeed())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeRunning)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeAgentHealthy)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeConfigurationGenerated)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeRunning)).To(BeTrue())
g.Expect(meta.IsStatusConditionFalse(pipeline.Status.Conditions, conditions.TypePending)).To(BeTrue())
}, periodic.EventuallyTimeout, periodic.DefaultInterval).Should(Succeed())
}
3 changes: 2 additions & 1 deletion test/testkit/verifiers/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ func TracePipelineShouldBeHealthy(ctx context.Context, k8sClient client.Client,
var pipeline telemetryv1alpha1.TracePipeline
key := types.NamespacedName{Name: pipelineName}
g.Expect(k8sClient.Get(ctx, key, &pipeline)).To(Succeed())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeRunning)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeGatewayHealthy)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeConfigurationGenerated)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeRunning)).To(BeTrue())
g.Expect(meta.IsStatusConditionFalse(pipeline.Status.Conditions, conditions.TypePending)).To(BeTrue())
}, periodic.EventuallyTimeout, periodic.DefaultInterval).Should(Succeed())
}

Expand Down

0 comments on commit 6ef6e90

Please sign in to comment.