Skip to content

Commit

Permalink
stabilize configchecks
Browse files Browse the repository at this point in the history
 - do not create new configcheck if one is still pending and running
 - sort all logging items before generating the config to get a consistent result
  • Loading branch information
pepov authored and tarokkk committed Feb 26, 2020
1 parent e9a3944 commit 55fe843
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 20 deletions.
72 changes: 72 additions & 0 deletions config/samples/logging_v1alpha2_multiflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
apiVersion: logging.banzaicloud.io/v1beta1
kind: Output
metadata:
name: output-sample
spec:
nullout: {}
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: Flow
metadata:
name: sample1
spec:
selectors:
sampleKey: sampleValue1
filters:
- tag_normaliser: {}
- stdout: {}
outputRefs:
- "output-sample"
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: Flow
metadata:
name: sample2
spec:
selectors:
sampleKey: sampleValue2
filters:
- tag_normaliser: {}
- stdout: {}
outputRefs:
- "output-sample"
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: Flow
metadata:
name: sample3
spec:
selectors:
sampleKey: sampleValue3
filters:
- tag_normaliser: {}
- stdout: {}
outputRefs:
- "output-sample"
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: Flow
metadata:
name: sample4
spec:
selectors:
sampleKey: sampleValue4
filters:
- tag_normaliser: {}
- stdout: {}
outputRefs:
- "output-sample"
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: Flow
metadata:
name: sample5
spec:
selectors:
sampleKey: sampleValue5
filters:
- tag_normaliser: {}
- stdout: {}
outputRefs:
- "output-sample"
---
65 changes: 60 additions & 5 deletions controllers/logging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"regexp"
"sort"

"emperror.dev/errors"
"github.com/banzaicloud/logging-operator/pkg/resources"
Expand Down Expand Up @@ -257,8 +258,19 @@ func (r *LoggingReconciler) GetResources(logging *loggingv1beta1.Logging) (*mode
if err != nil {
return nil, err
}

if len(clusterFlows.Items) > 0 {
for _, i := range clusterFlows.Items {
items := clusterFlows.Items
sort.Slice(items, func(i, j int) bool {
if items[i].GetNamespace() < items[j].GetNamespace() {
return true
}
if items[i].GetNamespace() == items[j].GetNamespace() {
return items[i].GetName() < items[j].GetName()
}
return false
})
for _, i := range items {
if i.Spec.LoggingRef == logging.Spec.LoggingRef {
loggingResources.ClusterFlows = append(loggingResources.ClusterFlows, i)
}
Expand All @@ -270,8 +282,19 @@ func (r *LoggingReconciler) GetResources(logging *loggingv1beta1.Logging) (*mode
if err != nil {
return nil, err
}

if len(clusterOutputs.Items) > 0 {
for _, i := range clusterOutputs.Items {
items := clusterOutputs.Items
sort.Slice(items, func(i, j int) bool {
if items[i].GetNamespace() < items[j].GetNamespace() {
return true
}
if items[i].GetNamespace() == items[j].GetNamespace() {
return items[i].GetName() < items[j].GetName()
}
return false
})
for _, i := range items {
if i.Spec.LoggingRef == logging.Spec.LoggingRef {
loggingResources.ClusterOutputs = append(loggingResources.ClusterOutputs, i)
}
Expand All @@ -286,7 +309,17 @@ func (r *LoggingReconciler) GetResources(logging *loggingv1beta1.Logging) (*mode
if err != nil {
return nil, errors.WrapIf(err, "failed to list all namespaces")
}
for _, ns := range nsList.Items {
items := nsList.Items
sort.Slice(items, func(i, j int) bool {
if items[i].GetNamespace() < items[j].GetNamespace() {
return true
}
if items[i].GetNamespace() == items[j].GetNamespace() {
return items[i].GetName() < items[j].GetName()
}
return false
})
for _, ns := range items {
watchNamespaces = append(watchNamespaces, ns.Name)
}
}
Expand All @@ -297,20 +330,42 @@ func (r *LoggingReconciler) GetResources(logging *loggingv1beta1.Logging) (*mode
if err != nil {
return nil, err
}

if len(flows.Items) > 0 {
for _, i := range flows.Items {
items := flows.Items
sort.Slice(items, func(i, j int) bool {
if items[i].GetNamespace() < items[j].GetNamespace() {
return true
}
if items[i].GetNamespace() == items[j].GetNamespace() {
return items[i].GetName() < items[j].GetName()
}
return false
})
for _, i := range items {
if i.Spec.LoggingRef == logging.Spec.LoggingRef {
loggingResources.Flows = append(loggingResources.Flows, i)
}
}
}

outputs := &loggingv1beta1.OutputList{}
err = r.List(context.TODO(), outputs, client.InNamespace(ns))
if err != nil {
return nil, err
}
if len(outputs.Items) > 0 {
for _, i := range outputs.Items {
items := outputs.Items
sort.Slice(items, func(i, j int) bool {
if items[i].GetNamespace() < items[j].GetNamespace() {
return true
}
if items[i].GetNamespace() == items[j].GetNamespace() {
return items[i].GetName() < items[j].GetName()
}
return false
})
for _, i := range items {
if i.Spec.LoggingRef == logging.Spec.LoggingRef {
loggingResources.Outputs = append(loggingResources.Outputs, i)
}
Expand Down
39 changes: 35 additions & 4 deletions pkg/resources/fluentd/appconfigmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ConfigCheckResult struct {
Valid bool
Ready bool
Valid bool
Ready bool
Message string
}

const ConfigKey = "generated.conf"
Expand Down Expand Up @@ -58,8 +60,37 @@ func (r *Reconciler) configCheck() (*ConfigCheckResult, error) {
if err != nil {
return nil, err
}

pod := r.newCheckPod(hashKey)

existingPods := &v1.PodList{}
err = r.Client.List(context.TODO(), existingPods, client.MatchingLabels(pod.Labels))
if err != nil {
return nil, errors.WrapIf(err, "failed to list existing configcheck pods")
}

podsByPhase := make(map[v1.PodPhase]int)
for _, p := range existingPods.Items {
if actual, ok := podsByPhase[p.Status.Phase]; ok {
podsByPhase[p.Status.Phase] = actual + 1
} else {
podsByPhase[p.Status.Phase] = 1
}
}

if podsByPhase[v1.PodPending] > 0 {
return &ConfigCheckResult{
Ready: false,
Message: "there are pending configcheck pods, need to back off",
}, nil
}
if podsByPhase[v1.PodRunning] > 0 {
return &ConfigCheckResult{
Ready: false,
Message: "there are running configcheck pods, need to back off",
}, nil
}

err = r.Client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, pod)
if err == nil {
// check pod status and write into the configmap
Expand Down Expand Up @@ -151,7 +182,7 @@ func (r *Reconciler) configCheckCleanup(currentHash string) ([]string, error) {

func (r *Reconciler) newCheckSecret(hashKey string) *v1.Secret {
return &v1.Secret{
ObjectMeta: r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-%s", hashKey), ComponentFluentd),
ObjectMeta: r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-%s", hashKey), ComponentConfigCheck),
Data: map[string][]byte{
ConfigKey: []byte(*r.config),
},
Expand All @@ -164,7 +195,7 @@ func (r *Reconciler) newCheckOutputSecret(hashKey string) (*v1.Secret, error) {
return nil, err
}
if secret, ok := obj.(*v1.Secret); ok {
secret.ObjectMeta = r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-output-%s", hashKey), ComponentFluentd)
secret.ObjectMeta = r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-output-%s", hashKey), ComponentConfigCheck)
return secret, nil
}
return nil, errors.New("output secret is invalid, unable to create output secret for config check")
Expand Down
27 changes: 16 additions & 11 deletions pkg/resources/fluentd/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,17 @@ func (r *Reconciler) Reconcile() (*reconcile.Result, error) {
var removedHashes []string
if removedHashes, err = r.configCheckCleanup(hash); err != nil {
r.Log.Error(err, "failed to cleanup resources")
}
if len(removedHashes) > 0 {
for _, removedHash := range removedHashes {
delete(r.Logging.Status.ConfigCheckResults, removedHash)
}
if err := r.Client.Status().Update(context.TODO(), r.Logging); err != nil {
return nil, errors.WrapWithDetails(err, "failed to update status", "logging", r.Logging)
} else {
// explicitly ask for a requeue to short circuit the controller loop after the status update
return &reconcile.Result{Requeue: true}, nil
} else {
if len(removedHashes) > 0 {
for _, removedHash := range removedHashes {
delete(r.Logging.Status.ConfigCheckResults, removedHash)
}
if err := r.Client.Status().Update(context.TODO(), r.Logging); err != nil {
return nil, errors.WrapWithDetails(err, "failed to update status", "logging", r.Logging)
} else {
// explicitly ask for a requeue to short circuit the controller loop after the status update
return &reconcile.Result{Requeue: true}, nil
}
}
}
} else {
Expand All @@ -139,7 +140,11 @@ func (r *Reconciler) Reconcile() (*reconcile.Result, error) {
return &reconcile.Result{Requeue: true}, nil
}
} else {
r.Log.Info("still waiting for the configcheck result...")
if result.Message != "" {
r.Log.Info(result.Message)
} else {
r.Log.Info("still waiting for the configcheck result...")
}
return &reconcile.Result{RequeueAfter: time.Second}, nil
}
}
Expand Down

0 comments on commit 55fe843

Please sign in to comment.