Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure pending condition is always preserved #887

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions internal/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func MessageFor(reason string, messageMap map[string]string) string {
return ""
}

func SetPendingCondition(ctx context.Context, conditions *[]metav1.Condition, generation int64, reason, resourceName string, messageMap map[string]string) {
func HandlePendingCondition(ctx context.Context, conditions *[]metav1.Condition, generation int64, reason, resourceName string, messageMap map[string]string) {
log := logf.FromContext(ctx)

pending := New(
Expand All @@ -127,32 +127,30 @@ func SetPendingCondition(ctx context.Context, conditions *[]metav1.Condition, ge
meta.SetStatusCondition(conditions, pending)
}

func SetRunningCondition(ctx context.Context, conditions *[]metav1.Condition, generation int64, reason, resourceName string, messageMap map[string]string) {
func HandleRunningCondition(ctx context.Context, conditions *[]metav1.Condition, generation int64, runningReason, pendingReason, resourceName string, messageMap map[string]string) {
log := logf.FromContext(ctx)

existingPending := meta.FindStatusCondition(*conditions, TypePending)
if existingPending != nil {
newPending := New(
TypePending,
existingPending.Reason,
metav1.ConditionFalse,
generation,
messageMap,
)
newPending.Message = PendingTypeDeprecationMsg + newPending.Message
log.V(1).Info(fmt.Sprintf("Updating the status of %s: Setting the Pending condition to False", resourceName))
meta.SetStatusCondition(conditions, newPending)
}
// Set Pending condition to False
pending := New(
TypePending,
pendingReason,
metav1.ConditionFalse,
generation,
messageMap,
)
pending.Message = PendingTypeDeprecationMsg + pending.Message
log.V(1).Info(fmt.Sprintf("Updating the status of %s: Setting the Pending condition to False", resourceName))
meta.SetStatusCondition(conditions, pending)

// Set Running condition to True
running := New(
TypeRunning,
reason,
runningReason,
metav1.ConditionTrue,
generation,
messageMap,
)
running.Message = RunningTypeDeprecationMsg + running.Message

log.V(1).Info(fmt.Sprintf("Updating the status of %s: Setting the Running condition to True", resourceName))
meta.SetStatusCondition(conditions, running)
}
84 changes: 61 additions & 23 deletions internal/conditions/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,31 @@ func TestMessageFor(t *testing.T) {
})
}

func TestSetPendingCondition(t *testing.T) {
t.Run("should just add pending condition if the conditions list is empty", func(t *testing.T) {
var conditions []metav1.Condition
func TestHandlePendingCondition(t *testing.T) {
t.Run("should just set pending condition to true if running condition is not in the conditions list", func(t *testing.T) {
conditions := []metav1.Condition{
{
Type: TypeAgentHealthy,
Status: metav1.ConditionFalse,
Reason: ReasonDaemonSetNotReady,
Message: MessageFor(ReasonDaemonSetNotReady, LogsMessage),
LastTransitionTime: metav1.Now(),
},
{
Type: TypeConfigurationGenerated,
Status: metav1.ConditionTrue,
Reason: ReasonConfigurationGenerated,
Message: MessageFor(ReasonConfigurationGenerated, LogsMessage),
LastTransitionTime: metav1.Now(),
},
}
generation := int64(1)
reason := ReasonFluentBitDSNotReady

SetPendingCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)
HandlePendingCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)

pendingCond := meta.FindStatusCondition(conditions, TypePending)
conditionsSize := len(conditions)
pendingCond := conditions[conditionsSize-1]
require.Equal(t, TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionTrue, pendingCond.Status)
require.Equal(t, reason, pendingCond.Reason)
Expand All @@ -52,6 +68,20 @@ func TestSetPendingCondition(t *testing.T) {

t.Run("should remove running condition and set pending condition to true", func(t *testing.T) {
conditions := []metav1.Condition{
{
Type: TypeAgentHealthy,
Status: metav1.ConditionFalse,
Reason: ReasonDaemonSetNotReady,
Message: MessageFor(ReasonDaemonSetNotReady, LogsMessage),
LastTransitionTime: metav1.Now(),
},
{
Type: TypeConfigurationGenerated,
Status: metav1.ConditionTrue,
Reason: ReasonConfigurationGenerated,
Message: MessageFor(ReasonConfigurationGenerated, LogsMessage),
LastTransitionTime: metav1.Now(),
},
{
Type: TypePending,
Status: metav1.ConditionFalse,
Expand All @@ -70,13 +100,13 @@ func TestSetPendingCondition(t *testing.T) {
generation := int64(1)
reason := ReasonFluentBitDSNotReady

SetPendingCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)
HandlePendingCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)

runningCond := meta.FindStatusCondition(conditions, TypeRunning)
require.Nil(t, runningCond)

pendingCond := meta.FindStatusCondition(conditions, TypePending)
require.NotNil(t, pendingCond)
conditionsSize := len(conditions)
pendingCond := conditions[conditionsSize-1]
require.Equal(t, TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionTrue, pendingCond.Status)
require.Equal(t, reason, pendingCond.Reason)
Expand All @@ -87,38 +117,46 @@ func TestSetPendingCondition(t *testing.T) {
})
}

func TestSetRunningCondition(t *testing.T) {
t.Run("should set pending condition to false and add running condition", func(t *testing.T) {
func TestHandleRunningCondition(t *testing.T) {
t.Run("should set pending condition to false and set running condition to true", func(t *testing.T) {
conditions := []metav1.Condition{
{
Type: TypePending,
Type: TypeAgentHealthy,
Status: metav1.ConditionTrue,
Reason: ReasonFluentBitDSNotReady,
Message: PendingTypeDeprecationMsg + MessageFor(ReasonFluentBitDSNotReady, LogsMessage),
Reason: ReasonDaemonSetReady,
Message: MessageFor(ReasonDaemonSetReady, LogsMessage),
LastTransitionTime: metav1.Now(),
},
{
Type: TypeConfigurationGenerated,
Status: metav1.ConditionTrue,
Reason: ReasonConfigurationGenerated,
Message: MessageFor(ReasonConfigurationGenerated, LogsMessage),
LastTransitionTime: metav1.Now(),
},
}
generation := int64(1)
reason := ReasonFluentBitDSReady
runningReason := ReasonFluentBitDSReady
pendingReason := ReasonFluentBitDSNotReady

HandleRunningCondition(context.Background(), &conditions, generation, runningReason, pendingReason, "pipeline", LogsMessage)

SetRunningCondition(context.Background(), &conditions, generation, reason, "pipeline", LogsMessage)
conditionsSize := len(conditions)

pendingCond := meta.FindStatusCondition(conditions, TypePending)
require.NotNil(t, pendingCond)
pendingCond := conditions[conditionsSize-2]
require.Equal(t, TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionFalse, pendingCond.Status)
require.Equal(t, ReasonFluentBitDSNotReady, pendingCond.Reason)
pendingCondMsg := PendingTypeDeprecationMsg + MessageFor(ReasonFluentBitDSNotReady, LogsMessage)
require.Equal(t, pendingReason, pendingCond.Reason)
pendingCondMsg := PendingTypeDeprecationMsg + MessageFor(pendingReason, LogsMessage)
require.Equal(t, pendingCondMsg, pendingCond.Message)
require.Equal(t, generation, pendingCond.ObservedGeneration)
require.NotEmpty(t, pendingCond.LastTransitionTime)

runningCond := meta.FindStatusCondition(conditions, TypeRunning)
require.NotNil(t, runningCond)
runningCond := conditions[conditionsSize-1]
require.Equal(t, TypeRunning, runningCond.Type)
require.Equal(t, metav1.ConditionTrue, runningCond.Status)
require.Equal(t, reason, runningCond.Reason)
runningCondMsg := RunningTypeDeprecationMsg + MessageFor(reason, LogsMessage)
require.Equal(t, runningReason, runningCond.Reason)
runningCondMsg := RunningTypeDeprecationMsg + MessageFor(runningReason, LogsMessage)
require.Equal(t, runningCondMsg, runningCond.Message)
require.Equal(t, generation, runningCond.ObservedGeneration)
require.NotEmpty(t, runningCond.LastTransitionTime)
Expand Down
12 changes: 10 additions & 2 deletions internal/reconciler/logparser/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,18 @@ func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, parser
}

if !fluentBitReady {
conditions.SetPendingCondition(ctx, &parser.Status.Conditions, parser.Generation, conditions.ReasonFluentBitDSNotReady, parser.Name, conditions.LogsMessage)
conditions.HandlePendingCondition(ctx, &parser.Status.Conditions, parser.Generation, conditions.ReasonFluentBitDSNotReady, parser.Name, conditions.LogsMessage)
return

}

conditions.SetRunningCondition(ctx, &parser.Status.Conditions, parser.Generation, conditions.ReasonFluentBitDSReady, parser.Name, conditions.LogsMessage)
conditions.HandleRunningCondition(
ctx,
&parser.Status.Conditions,
parser.Generation,
conditions.ReasonFluentBitDSReady,
conditions.ReasonFluentBitDSNotReady,
parser.Name,
conditions.LogsMessage,
)
}
10 changes: 10 additions & 0 deletions internal/reconciler/logparser/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func TestUpdateStatus(t *testing.T) {
require.NotEmpty(t, agentHealthyCond.LastTransitionTime)

conditionsSize := len(updatedParser.Status.Conditions)

pendingCond := updatedParser.Status.Conditions[conditionsSize-2]
require.Equal(t, conditions.TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionFalse, pendingCond.Status)
require.Equal(t, conditions.ReasonFluentBitDSNotReady, pendingCond.Reason)
pendingCondMsg := conditions.PendingTypeDeprecationMsg + conditions.MessageFor(conditions.ReasonFluentBitDSNotReady, conditions.LogsMessage)
require.Equal(t, pendingCondMsg, pendingCond.Message)
require.Equal(t, updatedParser.Generation, pendingCond.ObservedGeneration)
require.NotEmpty(t, pendingCond.LastTransitionTime)

runningCond := updatedParser.Status.Conditions[conditionsSize-1]
require.Equal(t, conditions.TypeRunning, runningCond.Type)
require.Equal(t, metav1.ConditionTrue, runningCond.Status)
Expand Down
16 changes: 12 additions & 4 deletions internal/reconciler/logpipeline/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ func (r *Reconciler) setFluentBitConfigGeneratedCondition(ctx context.Context, p
func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, pipeline *telemetryv1alpha1.LogPipeline) {

if pipeline.Spec.Output.IsLokiDefined() {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonUnsupportedLokiOutput, pipeline.Name, conditions.LogsMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonUnsupportedLokiOutput, pipeline.Name, conditions.LogsMessage)
return
}

referencesNonExistentSecret := secretref.ReferencesNonExistentSecret(ctx, r.Client, pipeline)
if referencesNonExistentSecret {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonReferencedSecretMissing, pipeline.Name, conditions.LogsMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonReferencedSecretMissing, pipeline.Name, conditions.LogsMessage)
return
}

Expand All @@ -119,9 +119,17 @@ func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, pipelin
}

if !fluentBitReady {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonFluentBitDSNotReady, pipeline.Name, conditions.LogsMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonFluentBitDSNotReady, pipeline.Name, conditions.LogsMessage)
return
}

conditions.SetRunningCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonFluentBitDSReady, pipeline.Name, conditions.LogsMessage)
conditions.HandleRunningCondition(
ctx,
&pipeline.Status.Conditions,
pipeline.Generation,
conditions.ReasonFluentBitDSReady,
conditions.ReasonFluentBitDSNotReady,
pipeline.Name,
conditions.LogsMessage,
)
}
10 changes: 10 additions & 0 deletions internal/reconciler/logpipeline/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func TestUpdateStatus(t *testing.T) {
require.NotEmpty(t, agentHealthyCond.LastTransitionTime)

conditionsSize := len(updatedPipeline.Status.Conditions)

pendingCond := updatedPipeline.Status.Conditions[conditionsSize-2]
require.Equal(t, conditions.TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionFalse, pendingCond.Status)
require.Equal(t, conditions.ReasonFluentBitDSNotReady, pendingCond.Reason)
pendingCondMsg := conditions.PendingTypeDeprecationMsg + conditions.MessageFor(conditions.ReasonFluentBitDSNotReady, conditions.LogsMessage)
require.Equal(t, pendingCondMsg, pendingCond.Message)
require.Equal(t, updatedPipeline.Generation, pendingCond.ObservedGeneration)
require.NotEmpty(t, pendingCond.LastTransitionTime)

runningCond := updatedPipeline.Status.Conditions[conditionsSize-1]
require.Equal(t, conditions.TypeRunning, runningCond.Type)
require.Equal(t, metav1.ConditionTrue, runningCond.Status)
Expand Down
18 changes: 13 additions & 5 deletions internal/reconciler/tracepipeline/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, pipelineName string, with
return nil
}

// If the "GatewayHealthy" type doesn't exist in the conditions,
// If the "GatewayHealthy" type doesn't exist in the conditions
// then we need to reset the conditions list to ensure that the "Pending" and "Running" conditions are appended to the end of the conditions list
// Check step 3 in https://github.com/kyma-project/telemetry-manager/blob/main/docs/contributor/arch/004-consolidate-pipeline-statuses.md#decision
if meta.FindStatusCondition(pipeline.Status.Conditions, conditions.TypeGatewayHealthy) == nil {
Expand Down Expand Up @@ -85,13 +85,13 @@ func (r *Reconciler) setGatewayConfigGeneratedCondition(ctx context.Context, pip

func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, pipeline *telemetryv1alpha1.TracePipeline, withinPipelineCountLimit bool) {
if !withinPipelineCountLimit {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonMaxPipelinesExceeded, pipeline.Name, conditions.TracesMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonMaxPipelinesExceeded, pipeline.Name, conditions.TracesMessage)
return
}

referencesNonExistentSecret := secretref.ReferencesNonExistentSecret(ctx, r.Client, pipeline)
if referencesNonExistentSecret {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonReferencedSecretMissing, pipeline.Name, conditions.TracesMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonReferencedSecretMissing, pipeline.Name, conditions.TracesMessage)
return
}

Expand All @@ -102,9 +102,17 @@ func (r *Reconciler) setPendingAndRunningConditions(ctx context.Context, pipelin
}

if !gatewayReady {
conditions.SetPendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonTraceGatewayDeploymentNotReady, pipeline.Name, conditions.TracesMessage)
conditions.HandlePendingCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonTraceGatewayDeploymentNotReady, pipeline.Name, conditions.TracesMessage)
return
}

conditions.SetRunningCondition(ctx, &pipeline.Status.Conditions, pipeline.Generation, conditions.ReasonTraceGatewayDeploymentReady, pipeline.Name, conditions.TracesMessage)
conditions.HandleRunningCondition(
ctx,
&pipeline.Status.Conditions,
pipeline.Generation,
conditions.ReasonTraceGatewayDeploymentReady,
conditions.ReasonTraceGatewayDeploymentNotReady,
pipeline.Name,
conditions.TracesMessage,
)
}
10 changes: 10 additions & 0 deletions internal/reconciler/tracepipeline/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ func TestUpdateStatus(t *testing.T) {
require.NotEmpty(t, gatewayHealthyCond.LastTransitionTime)

conditionsSize := len(updatedPipeline.Status.Conditions)

pendingCond := updatedPipeline.Status.Conditions[conditionsSize-2]
require.Equal(t, conditions.TypePending, pendingCond.Type)
require.Equal(t, metav1.ConditionFalse, pendingCond.Status)
require.Equal(t, conditions.ReasonTraceGatewayDeploymentNotReady, pendingCond.Reason)
pendingCondMsg := conditions.PendingTypeDeprecationMsg + conditions.MessageFor(conditions.ReasonTraceGatewayDeploymentNotReady, conditions.TracesMessage)
require.Equal(t, pendingCondMsg, pendingCond.Message)
require.Equal(t, updatedPipeline.Generation, pendingCond.ObservedGeneration)
require.NotEmpty(t, pendingCond.LastTransitionTime)

runningCond := updatedPipeline.Status.Conditions[conditionsSize-1]
require.Equal(t, conditions.TypeRunning, runningCond.Type)
require.Equal(t, metav1.ConditionTrue, runningCond.Status)
Expand Down
3 changes: 2 additions & 1 deletion test/testkit/verifiers/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func LogPipelineShouldBeHealthy(ctx context.Context, k8sClient client.Client, pi
var pipeline telemetryv1alpha1.LogPipeline
key := types.NamespacedName{Name: pipelineName}
g.Expect(k8sClient.Get(ctx, key, &pipeline)).To(Succeed())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeRunning)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeAgentHealthy)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeConfigurationGenerated)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeRunning)).To(BeTrue())
g.Expect(meta.IsStatusConditionFalse(pipeline.Status.Conditions, conditions.TypePending)).To(BeTrue())
}, periodic.EventuallyTimeout, periodic.DefaultInterval).Should(Succeed())
}
3 changes: 2 additions & 1 deletion test/testkit/verifiers/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ func TracePipelineShouldBeHealthy(ctx context.Context, k8sClient client.Client,
var pipeline telemetryv1alpha1.TracePipeline
key := types.NamespacedName{Name: pipelineName}
g.Expect(k8sClient.Get(ctx, key, &pipeline)).To(Succeed())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeRunning)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeGatewayHealthy)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeConfigurationGenerated)).To(BeTrue())
g.Expect(meta.IsStatusConditionTrue(pipeline.Status.Conditions, conditions.TypeRunning)).To(BeTrue())
g.Expect(meta.IsStatusConditionFalse(pipeline.Status.Conditions, conditions.TypePending)).To(BeTrue())
}, periodic.EventuallyTimeout, periodic.DefaultInterval).Should(Succeed())
}

Expand Down
Loading