From 19cc33be120101b7eee0f64dee2e44f33be12a88 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 26 Jul 2024 15:41:52 +0200 Subject: [PATCH] Send notifications events to Icinga Notifications daemon Co-Authored-By: Eric Lippmann --- cmd/icinga-kubernetes/main.go | 86 +++++++++++++++++++++++---- {internal => pkg/daemon}/config.go | 2 +- pkg/notifications/client.go | 95 ++++++++++++++++++++++++++++++ pkg/schema/v1/container.go | 32 +++++++++- pkg/schema/v1/daemon_set.go | 22 +++++++ pkg/schema/v1/deployment.go | 22 +++++++ pkg/schema/v1/icinga_state.go | 17 ++++++ pkg/schema/v1/node.go | 22 +++++++ pkg/schema/v1/pod.go | 22 +++++++ pkg/schema/v1/replica_set.go | 22 +++++++ pkg/schema/v1/stateful_set.go | 22 +++++++ 11 files changed, 349 insertions(+), 15 deletions(-) rename {internal => pkg/daemon}/config.go (98%) create mode 100644 pkg/notifications/client.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index d64eb7c6..790bac93 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -9,6 +9,7 @@ import ( "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-kubernetes/internal" "github.com/icinga/icinga-kubernetes/pkg/com" + "github.com/icinga/icinga-kubernetes/pkg/daemon" "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/metrics" "github.com/icinga/icinga-kubernetes/pkg/notifications" @@ -61,7 +62,7 @@ func main() { factory := informers.NewSharedInformerFactory(clientset, 0) log := klog.NewKlogr() - var cfg internal.Config + var cfg daemon.Config err = config.FromYAMLFile(configLocation, &cfg) if err != nil { klog.Fatal(errors.Wrap(err, "can't create configuration")) @@ -93,9 +94,13 @@ func main() { } } + var nclient *notifications.Client if err := notifications.SyncSourceConfig(context.Background(), db, &cfg.Notifications); err != nil { klog.Fatal(err) } + if cfg.Notifications.Url != "" { + nclient = notifications.NewClient(db, cfg.Notifications) + } g, ctx := errgroup.WithContext(context.Background()) @@ -135,42 +140,97 @@ func main() { return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode) + nodes := internal.NewMultiplex() + if cfg.Notifications.Url != "" { + nodesOut := nodes.Out() + g.Go(func() error { return nclient.Stream(ctx, nodesOut) }) + } - return s.Run(ctx) + nodesIn := nodes.In() + g.Go(func() error { return nodes.Do(ctx) }) + + s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode) + return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(nodesIn))) }) g.Go(func() error { - pods := make(chan any) - deletePodIds := make(chan interface{}) - defer close(pods) - defer close(deletePodIds) + pods := internal.NewMultiplex() + deletedPodUuids := internal.NewMultiplex() - schemav1.SyncContainers(ctx, db, g, pods, deletePodIds) + if cfg.Notifications.Url != "" { + podsOut := pods.Out() + g.Go(func() error { return nclient.Stream(ctx, podsOut) }) + } + + containers := make(chan any) + g.Go(func() error { return nclient.Stream(ctx, containers) }) + + schemav1.SyncContainers(ctx, db, g, pods.Out(), deletedPodUuids.Out(), containers) f := schemav1.NewPodFactory(clientset) s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New) - return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(pods)), sync.WithOnDelete(com.ForwardBulk(deletePodIds))) + podsIn := pods.In() + deletedIn := deletedPodUuids.In() + + g.Go(func() error { return pods.Do(ctx) }) + g.Go(func() error { return deletedPodUuids.Do(ctx) }) + + return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(podsIn)), sync.WithOnDelete(com.ForwardBulk(deletedIn))) }) g.Go(func() error { + deployments := internal.NewMultiplex() + if cfg.Notifications.Url != "" { + deploymentsOut := deployments.Out() + g.Go(func() error { return nclient.Stream(ctx, deploymentsOut) }) + } s := syncv1.NewSync(db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment) - return s.Run(ctx) + deploymentsIn := deployments.In() + g.Go(func() error { return deployments.Do(ctx) }) + + return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(deploymentsIn))) }) g.Go(func() error { + daemonSet := internal.NewMultiplex() + if cfg.Notifications.Url != "" { + daemonSetOut := daemonSet.Out() + g.Go(func() error { return nclient.Stream(ctx, daemonSetOut) }) + } + + daemonSetIn := daemonSet.In() + g.Go(func() error { return daemonSet.Do(ctx) }) + s := syncv1.NewSync(db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet) - return s.Run(ctx) + return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(daemonSetIn))) }) g.Go(func() error { + replicaSet := internal.NewMultiplex() + if cfg.Notifications.Url != "" { + replicaSetOut := replicaSet.Out() + g.Go(func() error { return nclient.Stream(ctx, replicaSetOut) }) + } + + replicaSetIn := replicaSet.In() + g.Go(func() error { return replicaSet.Do(ctx) }) + s := syncv1.NewSync(db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet) - return s.Run(ctx) + return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(replicaSetIn))) }) g.Go(func() error { + statefulSet := internal.NewMultiplex() + if cfg.Notifications.Url != "" { + statefulSetOut := statefulSet.Out() + g.Go(func() error { return nclient.Stream(ctx, statefulSetOut) }) + } + + statefulSetIn := statefulSet.In() + g.Go(func() error { return statefulSet.Do(ctx) }) + s := syncv1.NewSync(db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet) - return s.Run(ctx) + return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(statefulSetIn))) }) g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService) diff --git a/internal/config.go b/pkg/daemon/config.go similarity index 98% rename from internal/config.go rename to pkg/daemon/config.go index 9753d01c..9fd47258 100644 --- a/internal/config.go +++ b/pkg/daemon/config.go @@ -1,4 +1,4 @@ -package internal +package daemon import ( "github.com/icinga/icinga-go-library/database" diff --git a/pkg/notifications/client.go b/pkg/notifications/client.go new file mode 100644 index 00000000..69de9e69 --- /dev/null +++ b/pkg/notifications/client.go @@ -0,0 +1,95 @@ +package notifications + +import ( + "bytes" + "context" + "encoding/json" + "github.com/icinga/icinga-kubernetes/internal" + "github.com/icinga/icinga-kubernetes/pkg/database" + "github.com/pkg/errors" + "io" + "k8s.io/klog/v2" + "net/http" + "net/url" +) + +// Notifiable can be implemented by all k8s types that want to submit notification events to Icinga Notifications. +type Notifiable interface { + // GetNotificationsEvent returns the event data of this type that will be transmitted to Icinga Notifications. + GetNotificationsEvent(baseUrl *url.URL) map[string]any +} + +type Client struct { + db *database.Database + client http.Client + Config +} + +func NewClient(db *database.Database, c Config) *Client { + return &Client{db: db, client: http.Client{}, Config: c} +} + +func (c *Client) ProcessEvent(ctx context.Context, notifiable Notifiable) error { + var username, password string + if !IsAutoCreationEnabled(&c.Config) { + username = c.Config.Username + password = c.Config.Password + } else { + var err error + if username, password, err = retrieveCredentials(ctx, c.db); err != nil { + return err + } + } + + baseUrl, err := url.Parse(c.Config.KubernetesWebUrl) + if err != nil { + return errors.Wrapf(err, "cannot parse Icinga for Kubernetes Web URL: %q", c.Config.KubernetesWebUrl) + } + + body, err := json.Marshal(notifiable.GetNotificationsEvent(baseUrl)) + if err != nil { + return err + } + + r, err := http.NewRequest(http.MethodPost, c.Config.Url+"/process-event", bytes.NewBuffer(body)) + if err != nil { + return errors.Wrap(err, "cannot create new http request") + } + + r.SetBasicAuth(username, password) + r.Header.Set("User-Agent", "icinga-kubernetes/"+internal.Version.Version) + r.Header.Add("Content-Type", "application/json") + + res, err := c.client.Do(r) + if err != nil { + return errors.Wrap(err, "cannot send http request") + } + defer func() { + _, _ = io.Copy(io.Discard, res.Body) + _ = res.Body.Close() + }() + + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAlreadyReported { + return errors.Errorf("unexpected http status code: %d", res.StatusCode) + } + + return nil +} + +// Stream consumes the items from the given `entities` chan and triggers a notifications event for each of them. +func (c *Client) Stream(ctx context.Context, entities <-chan any) error { + for { + select { + case entity, more := <-entities: + if !more { + return nil + } + + if err := c.ProcessEvent(ctx, entity.(Notifiable)); err != nil { + klog.Error(err) + } + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/pkg/schema/v1/container.go b/pkg/schema/v1/container.go index da199abf..98054fb0 100644 --- a/pkg/schema/v1/container.go +++ b/pkg/schema/v1/container.go @@ -14,6 +14,7 @@ import ( kcorev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" + "net/url" "strings" "sync" "time" @@ -202,6 +203,27 @@ func NewContainer(podUuid types.UUID, container kcorev1.Container, status kcorev return c } +// GetNotificationsEvent implements the notifications.Notifiable interface. +func (c *Container) GetNotificationsEvent(baseUrl *url.URL) map[string]any { + containerUrl := baseUrl.JoinPath("/container") + containerUrl.RawQuery = fmt.Sprintf("id=%s", c.Uuid) + + return map[string]any{ + "name": c.Name, + "severity": c.IcingaState.ToSeverity(), + "message": c.IcingaStateReason, + "url": containerUrl.String(), + "tags": map[string]any{ + "uuid": c.Uuid.String(), + }, + "extra_tags": map[string]any{ + "pod_uuid": c.PodUuid, + "name": c.Name, + "resource": "container", + }, + } +} + type ContainerDevice struct { ContainerUuid types.UUID PodUuid types.UUID @@ -409,7 +431,9 @@ func GetContainerState(container kcorev1.Container, status kcorev1.ContainerStat // When pods are deleted, their IDs are streamed through the `deletePods` chan, and this fetches all the container // IDs matching the respective pod ID from the database and initiates a container deletion stream that cleans up all // container-related resources. -func SyncContainers(ctx context.Context, db *database.Database, g *errgroup.Group, upsertPods <-chan interface{}, deletePods <-chan interface{}) { +func SyncContainers( + ctx context.Context, db *database.Database, g *errgroup.Group, upsertPods, deletePods <-chan any, containerOut chan any, +) { type containerFingerprint struct { Uuid types.UUID PodUuid types.UUID @@ -539,6 +563,12 @@ func SyncContainers(ctx context.Context, db *database.Database, g *errgroup.Grou delete(containerLogs, container.Uuid.String()) containerLogsMu.Unlock() } + + select { + case containerOut <- container: + case <-ctx.Done(): + return ctx.Err() + } } } } diff --git a/pkg/schema/v1/daemon_set.go b/pkg/schema/v1/daemon_set.go index 8c0069ba..8c3a3ce3 100644 --- a/pkg/schema/v1/daemon_set.go +++ b/pkg/schema/v1/daemon_set.go @@ -10,6 +10,7 @@ import ( kruntime "k8s.io/apimachinery/pkg/runtime" kserializer "k8s.io/apimachinery/pkg/runtime/serializer" kjson "k8s.io/apimachinery/pkg/runtime/serializer/json" + "net/url" "strings" ) @@ -117,6 +118,27 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) { d.Yaml = string(output) } +// GetNotificationsEvent implements the notifications.Notifiable interface. +func (d *DaemonSet) GetNotificationsEvent(baseUrl *url.URL) map[string]any { + daemonSetUrl := baseUrl.JoinPath("/daemonset") + daemonSetUrl.RawQuery = fmt.Sprintf("id=%s", d.Uuid) + + return map[string]any{ + "name": d.Namespace + "/" + d.Name, + "severity": d.IcingaState.ToSeverity(), + "message": d.IcingaStateReason, + "url": daemonSetUrl.String(), + "tags": map[string]any{ + "uuid": d.Uuid.String(), + }, + "extra_tags": map[string]any{ + "namespace": d.Namespace, + "name": d.Name, + "resource": "daemon_set", + }, + } +} + func (d *DaemonSet) getIcingaState() (IcingaState, string) { if d.DesiredNumberScheduled < 1 { reason := fmt.Sprintf("DaemonSet %s/%s has an invalid desired node count: %d.", d.Namespace, d.Name, d.DesiredNumberScheduled) diff --git a/pkg/schema/v1/deployment.go b/pkg/schema/v1/deployment.go index 65ff932d..07be9f15 100644 --- a/pkg/schema/v1/deployment.go +++ b/pkg/schema/v1/deployment.go @@ -12,6 +12,7 @@ import ( kserializer "k8s.io/apimachinery/pkg/runtime/serializer" kjson "k8s.io/apimachinery/pkg/runtime/serializer/json" ktypes "k8s.io/apimachinery/pkg/types" + "net/url" "strings" ) @@ -170,6 +171,27 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) { d.Yaml = string(output) } +// GetNotificationsEvent implements the notifications.Notifiable interface. +func (d *Deployment) GetNotificationsEvent(baseUrl *url.URL) map[string]any { + deploymentUrl := baseUrl.JoinPath("/deployment") + deploymentUrl.RawQuery = fmt.Sprintf("id=%s", d.Uuid) + + return map[string]any{ + "name": d.Namespace + "/" + d.Name, + "severity": d.IcingaState.ToSeverity(), + "message": d.IcingaStateReason, + "url": deploymentUrl.String(), + "tags": map[string]any{ + "uuid": d.Uuid.String(), + }, + "extra_tags": map[string]any{ + "namespace": d.Namespace, + "name": d.Name, + "resource": "deployment", + }, + } +} + func (d *Deployment) getIcingaState() (IcingaState, string) { if gracePeriodReason := IsWithinGracePeriod(d); gracePeriodReason != nil { return Ok, *gracePeriodReason diff --git a/pkg/schema/v1/icinga_state.go b/pkg/schema/v1/icinga_state.go index 210a1739..f02ffd00 100644 --- a/pkg/schema/v1/icinga_state.go +++ b/pkg/schema/v1/icinga_state.go @@ -37,6 +37,23 @@ func (s IcingaState) Value() (driver.Value, error) { return s.String(), nil } +func (s IcingaState) ToSeverity() string { + switch s { + case Ok: + return "ok" + case Pending: + return "info" + case Unknown: + return "err" + case Warning: + return "warning" + case Critical: + return "crit" + default: + panic(fmt.Sprintf("invalid Icinga state %d", s)) + } +} + // Assert interface compliance. var ( _ fmt.Stringer = (*IcingaState)(nil) diff --git a/pkg/schema/v1/node.go b/pkg/schema/v1/node.go index 1b1c3b1f..592cae88 100644 --- a/pkg/schema/v1/node.go +++ b/pkg/schema/v1/node.go @@ -12,6 +12,7 @@ import ( kjson "k8s.io/apimachinery/pkg/runtime/serializer/json" knet "k8s.io/utils/net" "net" + "net/url" "strings" ) @@ -190,6 +191,27 @@ func (n *Node) Obtain(k8s kmetav1.Object) { } } +// GetNotificationsEvent implements the notifications.Notifiable interface. +func (n *Node) GetNotificationsEvent(baseUrl *url.URL) map[string]any { + nodeUrl := baseUrl.JoinPath("/node") + nodeUrl.RawQuery = fmt.Sprintf("id=%s", n.Uuid) + + return map[string]any{ + "name": n.Name, + "severity": n.IcingaState.ToSeverity(), + "message": n.IcingaStateReason, + "url": nodeUrl.String(), + "tags": map[string]any{ + "uuid": n.Uuid.String(), + }, + "extra_tags": map[string]any{ + "namespace": n.Namespace, + "name": n.Name, + "resource": "node", + }, + } +} + func (n *Node) getIcingaState(node *kcorev1.Node) (IcingaState, string) { //if node.Status.Phase == kcorev1.NodePending { // return Pending, fmt.Sprintf("Node %s is pending.", node.Name) diff --git a/pkg/schema/v1/pod.go b/pkg/schema/v1/pod.go index db9628c6..c37923aa 100644 --- a/pkg/schema/v1/pod.go +++ b/pkg/schema/v1/pod.go @@ -13,6 +13,7 @@ import ( kjson "k8s.io/apimachinery/pkg/runtime/serializer/json" ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + "net/url" "strings" ) @@ -245,6 +246,27 @@ func (p *Pod) Obtain(k8s kmetav1.Object) { p.Yaml = string(output) } +// GetNotificationsEvent implements the notifications.Notifiable interface. +func (p *Pod) GetNotificationsEvent(baseUrl *url.URL) map[string]any { + podUrl := baseUrl.JoinPath("/pod") + podUrl.RawQuery = fmt.Sprintf("id=%s", p.Uuid) + + return map[string]any{ + "name": p.Namespace + "/" + p.Name, + "severity": p.IcingaState.ToSeverity(), + "message": p.IcingaStateReason, + "url": podUrl.String(), + "tags": map[string]any{ + "uuid": p.Uuid.String(), + }, + "extra_tags": map[string]any{ + "namespace": p.Namespace, + "name": p.Name, + "resource": "pod", + }, + } +} + func (p *Pod) getIcingaState(pod *kcorev1.Pod) (IcingaState, string) { if pod.DeletionTimestamp != nil { if pod.Status.Reason == "NodeLost" { diff --git a/pkg/schema/v1/replica_set.go b/pkg/schema/v1/replica_set.go index b9f0ac28..45b9aed6 100644 --- a/pkg/schema/v1/replica_set.go +++ b/pkg/schema/v1/replica_set.go @@ -12,6 +12,7 @@ import ( kserializer "k8s.io/apimachinery/pkg/runtime/serializer" kjson "k8s.io/apimachinery/pkg/runtime/serializer/json" ktypes "k8s.io/apimachinery/pkg/types" + "net/url" "strings" ) @@ -153,6 +154,27 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { r.Yaml = string(output) } +// GetNotificationsEvent implements the notifications.Notifiable interface. +func (r *ReplicaSet) GetNotificationsEvent(baseUrl *url.URL) map[string]any { + replicaSetUrl := baseUrl.JoinPath("/replicaset") + replicaSetUrl.RawQuery = fmt.Sprintf("id=%s", r.Uuid) + + return map[string]any{ + "name": r.Namespace + "/" + r.Name, + "severity": r.IcingaState.ToSeverity(), + "message": r.IcingaStateReason, + "url": replicaSetUrl.String(), + "tags": map[string]any{ + "uuid": r.Uuid.String(), + }, + "extra_tags": map[string]any{ + "namespace": r.Namespace, + "name": r.Name, + "resource": "replica_set", + }, + } +} + func (r *ReplicaSet) getIcingaState() (IcingaState, string) { if r.DesiredReplicas < 1 { reason := fmt.Sprintf("ReplicaSet %s/%s has an invalid desired replica count: %d.", r.Namespace, r.Name, r.DesiredReplicas) diff --git a/pkg/schema/v1/stateful_set.go b/pkg/schema/v1/stateful_set.go index a434a912..2c8278d7 100644 --- a/pkg/schema/v1/stateful_set.go +++ b/pkg/schema/v1/stateful_set.go @@ -10,6 +10,7 @@ import ( kruntime "k8s.io/apimachinery/pkg/runtime" kserializer "k8s.io/apimachinery/pkg/runtime/serializer" kjson "k8s.io/apimachinery/pkg/runtime/serializer/json" + "net/url" "strings" ) @@ -141,6 +142,27 @@ func (s *StatefulSet) Obtain(k8s kmetav1.Object) { s.Yaml = string(output) } +// GetNotificationsEvent implements the notifications.Notifiable interface. +func (s *StatefulSet) GetNotificationsEvent(baseUrl *url.URL) map[string]any { + statefulSetUrl := baseUrl.JoinPath("/statefulset") + statefulSetUrl.RawQuery = fmt.Sprintf("id=%s", s.Uuid) + + return map[string]any{ + "name": s.Namespace + "/" + s.Name, + "severity": s.IcingaState.ToSeverity(), + "message": s.IcingaStateReason, + "url": statefulSetUrl.String(), + "tags": map[string]any{ + "uuid": s.Uuid.String(), + }, + "extra_tags": map[string]any{ + "namespace": s.Namespace, + "name": s.Name, + "resource": "stateful_set", + }, + } +} + func (s *StatefulSet) getIcingaState() (IcingaState, string) { if gracePeriodReason := IsWithinGracePeriod(s); gracePeriodReason != nil { return Ok, *gracePeriodReason