From 55fe843799a2f4d2792a5721fb3f1871161b0b0a Mon Sep 17 00:00:00 2001 From: pepov Date: Wed, 26 Feb 2020 14:55:39 +0100 Subject: [PATCH] stabilize configchecks - do not create new configcheck if one is still pending and running - sort all logging items before generating the config to get a consistent result --- .../samples/logging_v1alpha2_multiflow.yaml | 72 +++++++++++++++++++ controllers/logging_controller.go | 65 +++++++++++++++-- pkg/resources/fluentd/appconfigmap.go | 39 ++++++++-- pkg/resources/fluentd/fluentd.go | 27 ++++--- 4 files changed, 183 insertions(+), 20 deletions(-) create mode 100644 config/samples/logging_v1alpha2_multiflow.yaml diff --git a/config/samples/logging_v1alpha2_multiflow.yaml b/config/samples/logging_v1alpha2_multiflow.yaml new file mode 100644 index 000000000..884ada347 --- /dev/null +++ b/config/samples/logging_v1alpha2_multiflow.yaml @@ -0,0 +1,72 @@ +apiVersion: logging.banzaicloud.io/v1beta1 +kind: Output +metadata: + name: output-sample +spec: + nullout: {} +--- +apiVersion: logging.banzaicloud.io/v1beta1 +kind: Flow +metadata: + name: sample1 +spec: + selectors: + sampleKey: sampleValue1 + filters: + - tag_normaliser: {} + - stdout: {} + outputRefs: + - "output-sample" +--- +apiVersion: logging.banzaicloud.io/v1beta1 +kind: Flow +metadata: + name: sample2 +spec: + selectors: + sampleKey: sampleValue2 + filters: + - tag_normaliser: {} + - stdout: {} + outputRefs: + - "output-sample" +--- +apiVersion: logging.banzaicloud.io/v1beta1 +kind: Flow +metadata: + name: sample3 +spec: + selectors: + sampleKey: sampleValue3 + filters: + - tag_normaliser: {} + - stdout: {} + outputRefs: + - "output-sample" +--- +apiVersion: logging.banzaicloud.io/v1beta1 +kind: Flow +metadata: + name: sample4 +spec: + selectors: + sampleKey: sampleValue4 + filters: + - tag_normaliser: {} + - stdout: {} + outputRefs: + - "output-sample" +--- +apiVersion: logging.banzaicloud.io/v1beta1 +kind: Flow +metadata: + name: sample5 +spec: + selectors: + sampleKey: sampleValue5 + filters: + - tag_normaliser: {} + - stdout: {} + outputRefs: + - "output-sample" +--- diff --git a/controllers/logging_controller.go b/controllers/logging_controller.go index 98726dad6..51ae1a436 100644 --- a/controllers/logging_controller.go +++ b/controllers/logging_controller.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "regexp" + "sort" "emperror.dev/errors" "github.com/banzaicloud/logging-operator/pkg/resources" @@ -257,8 +258,19 @@ func (r *LoggingReconciler) GetResources(logging *loggingv1beta1.Logging) (*mode if err != nil { return nil, err } + if len(clusterFlows.Items) > 0 { - for _, i := range clusterFlows.Items { + items := clusterFlows.Items + sort.Slice(items, func(i, j int) bool { + if items[i].GetNamespace() < items[j].GetNamespace() { + return true + } + if items[i].GetNamespace() == items[j].GetNamespace() { + return items[i].GetName() < items[j].GetName() + } + return false + }) + for _, i := range items { if i.Spec.LoggingRef == logging.Spec.LoggingRef { loggingResources.ClusterFlows = append(loggingResources.ClusterFlows, i) } @@ -270,8 +282,19 @@ func (r *LoggingReconciler) GetResources(logging *loggingv1beta1.Logging) (*mode if err != nil { return nil, err } + if len(clusterOutputs.Items) > 0 { - for _, i := range clusterOutputs.Items { + items := clusterOutputs.Items + sort.Slice(items, func(i, j int) bool { + if items[i].GetNamespace() < items[j].GetNamespace() { + return true + } + if items[i].GetNamespace() == items[j].GetNamespace() { + return items[i].GetName() < items[j].GetName() + } + return false + }) + for _, i := range items { if i.Spec.LoggingRef == logging.Spec.LoggingRef { loggingResources.ClusterOutputs = append(loggingResources.ClusterOutputs, i) } @@ -286,7 +309,17 @@ func (r *LoggingReconciler) GetResources(logging *loggingv1beta1.Logging) (*mode if err != nil { return nil, errors.WrapIf(err, "failed to list all namespaces") } - for _, ns := range nsList.Items { + items := nsList.Items + sort.Slice(items, func(i, j int) bool { + if items[i].GetNamespace() < items[j].GetNamespace() { + return true + } + if items[i].GetNamespace() == items[j].GetNamespace() { + return items[i].GetName() < items[j].GetName() + } + return false + }) + for _, ns := range items { watchNamespaces = append(watchNamespaces, ns.Name) } } @@ -297,20 +330,42 @@ func (r *LoggingReconciler) GetResources(logging *loggingv1beta1.Logging) (*mode if err != nil { return nil, err } + if len(flows.Items) > 0 { - for _, i := range flows.Items { + items := flows.Items + sort.Slice(items, func(i, j int) bool { + if items[i].GetNamespace() < items[j].GetNamespace() { + return true + } + if items[i].GetNamespace() == items[j].GetNamespace() { + return items[i].GetName() < items[j].GetName() + } + return false + }) + for _, i := range items { if i.Spec.LoggingRef == logging.Spec.LoggingRef { loggingResources.Flows = append(loggingResources.Flows, i) } } } + outputs := &loggingv1beta1.OutputList{} err = r.List(context.TODO(), outputs, client.InNamespace(ns)) if err != nil { return nil, err } if len(outputs.Items) > 0 { - for _, i := range outputs.Items { + items := outputs.Items + sort.Slice(items, func(i, j int) bool { + if items[i].GetNamespace() < items[j].GetNamespace() { + return true + } + if items[i].GetNamespace() == items[j].GetNamespace() { + return items[i].GetName() < items[j].GetName() + } + return false + }) + for _, i := range items { if i.Spec.LoggingRef == logging.Spec.LoggingRef { loggingResources.Outputs = append(loggingResources.Outputs, i) } diff --git a/pkg/resources/fluentd/appconfigmap.go b/pkg/resources/fluentd/appconfigmap.go index 6b1385435..937699747 100644 --- a/pkg/resources/fluentd/appconfigmap.go +++ b/pkg/resources/fluentd/appconfigmap.go @@ -26,11 +26,13 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) type ConfigCheckResult struct { - Valid bool - Ready bool + Valid bool + Ready bool + Message string } const ConfigKey = "generated.conf" @@ -58,8 +60,37 @@ func (r *Reconciler) configCheck() (*ConfigCheckResult, error) { if err != nil { return nil, err } + pod := r.newCheckPod(hashKey) + existingPods := &v1.PodList{} + err = r.Client.List(context.TODO(), existingPods, client.MatchingLabels(pod.Labels)) + if err != nil { + return nil, errors.WrapIf(err, "failed to list existing configcheck pods") + } + + podsByPhase := make(map[v1.PodPhase]int) + for _, p := range existingPods.Items { + if actual, ok := podsByPhase[p.Status.Phase]; ok { + podsByPhase[p.Status.Phase] = actual + 1 + } else { + podsByPhase[p.Status.Phase] = 1 + } + } + + if podsByPhase[v1.PodPending] > 0 { + return &ConfigCheckResult{ + Ready: false, + Message: "there are pending configcheck pods, need to back off", + }, nil + } + if podsByPhase[v1.PodRunning] > 0 { + return &ConfigCheckResult{ + Ready: false, + Message: "there are running configcheck pods, need to back off", + }, nil + } + err = r.Client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, pod) if err == nil { // check pod status and write into the configmap @@ -151,7 +182,7 @@ func (r *Reconciler) configCheckCleanup(currentHash string) ([]string, error) { func (r *Reconciler) newCheckSecret(hashKey string) *v1.Secret { return &v1.Secret{ - ObjectMeta: r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-%s", hashKey), ComponentFluentd), + ObjectMeta: r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-%s", hashKey), ComponentConfigCheck), Data: map[string][]byte{ ConfigKey: []byte(*r.config), }, @@ -164,7 +195,7 @@ func (r *Reconciler) newCheckOutputSecret(hashKey string) (*v1.Secret, error) { return nil, err } if secret, ok := obj.(*v1.Secret); ok { - secret.ObjectMeta = r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-output-%s", hashKey), ComponentFluentd) + secret.ObjectMeta = r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-output-%s", hashKey), ComponentConfigCheck) return secret, nil } return nil, errors.New("output secret is invalid, unable to create output secret for config check") diff --git a/pkg/resources/fluentd/fluentd.go b/pkg/resources/fluentd/fluentd.go index e512cf9d0..f566b2f0b 100644 --- a/pkg/resources/fluentd/fluentd.go +++ b/pkg/resources/fluentd/fluentd.go @@ -110,16 +110,17 @@ func (r *Reconciler) Reconcile() (*reconcile.Result, error) { var removedHashes []string if removedHashes, err = r.configCheckCleanup(hash); err != nil { r.Log.Error(err, "failed to cleanup resources") - } - if len(removedHashes) > 0 { - for _, removedHash := range removedHashes { - delete(r.Logging.Status.ConfigCheckResults, removedHash) - } - if err := r.Client.Status().Update(context.TODO(), r.Logging); err != nil { - return nil, errors.WrapWithDetails(err, "failed to update status", "logging", r.Logging) - } else { - // explicitly ask for a requeue to short circuit the controller loop after the status update - return &reconcile.Result{Requeue: true}, nil + } else { + if len(removedHashes) > 0 { + for _, removedHash := range removedHashes { + delete(r.Logging.Status.ConfigCheckResults, removedHash) + } + if err := r.Client.Status().Update(context.TODO(), r.Logging); err != nil { + return nil, errors.WrapWithDetails(err, "failed to update status", "logging", r.Logging) + } else { + // explicitly ask for a requeue to short circuit the controller loop after the status update + return &reconcile.Result{Requeue: true}, nil + } } } } else { @@ -139,7 +140,11 @@ func (r *Reconciler) Reconcile() (*reconcile.Result, error) { return &reconcile.Result{Requeue: true}, nil } } else { - r.Log.Info("still waiting for the configcheck result...") + if result.Message != "" { + r.Log.Info(result.Message) + } else { + r.Log.Info("still waiting for the configcheck result...") + } return &reconcile.Result{RequeueAfter: time.Second}, nil } }