Skip to content

Commit

Permalink
Refactor Icinga Notifications integration
Browse files Browse the repository at this point in the history
  • Loading branch information
lippserd committed Nov 18, 2024
1 parent 511a805 commit e4223a9
Show file tree
Hide file tree
Showing 24 changed files with 782 additions and 341 deletions.
194 changes: 131 additions & 63 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/icinga/icinga-go-library/periodic"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-kubernetes/internal"
cachev1 "github.com/icinga/icinga-kubernetes/internal/cache/v1"
"github.com/icinga/icinga-kubernetes/pkg/backoff"
"github.com/icinga/icinga-kubernetes/pkg/com"
"github.com/icinga/icinga-kubernetes/pkg/daemon"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/icinga/icinga-kubernetes/pkg/notifications"
"github.com/icinga/icinga-kubernetes/pkg/retry"
schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1"
"github.com/icinga/icinga-kubernetes/pkg/sync"
syncv1 "github.com/icinga/icinga-kubernetes/pkg/sync/v1"
k8sMysql "github.com/icinga/icinga-kubernetes/schema/mysql"
"github.com/okzk/sdnotify"
Expand All @@ -35,6 +35,7 @@ import (
"k8s.io/klog/v2"
"os"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -232,20 +233,41 @@ func main() {
}
}, periodic.Immediate()).Stop()

var nclient *notifications.Client
if err := notifications.SyncSourceConfig(ctx, db2, &cfg.Notifications); err != nil {
if err := internal.SyncNotificationsConfig(ctx, db2, &cfg.Notifications); err != nil {
klog.Fatal(err)
}

if cfg.Notifications.Url == "" {
err = notifications.RetrieveConfig(ctx, db2, &cfg.Notifications)
if cfg.Notifications.Url != "" {
klog.Infof("Sending notifications to %s", cfg.Notifications.Url)

nclient, err := notifications.NewClient("icinga-kubernetes/"+internal.Version.Version, cfg.Notifications)
if err != nil {
klog.Error(errors.Wrap(err, "cannot retrieve Icinga Notifications config"))
klog.Fatal(err)
}
}

if cfg.Notifications.Url != "" {
nclient = notifications.NewClient(db2, cfg.Notifications)
g.Go(func() error {
return nclient.Stream(ctx, cachev1.Multiplexers().Nodes().UpsertEvents().Out())
})

g.Go(func() error {
return nclient.Stream(ctx, cachev1.Multiplexers().DaemonSets().UpsertEvents().Out())
})

g.Go(func() error {
return nclient.Stream(ctx, cachev1.Multiplexers().StatefulSets().UpsertEvents().Out())
})

g.Go(func() error {
return nclient.Stream(ctx, cachev1.Multiplexers().Deployments().UpsertEvents().Out())
})

g.Go(func() error {
return nclient.Stream(ctx, cachev1.Multiplexers().ReplicaSets().UpsertEvents().Out())
})

g.Go(func() error {
return nclient.Stream(ctx, cachev1.Multiplexers().Pods().UpsertEvents().Out())
})
}

if cfg.Prometheus.Url != "" {
Expand All @@ -271,146 +293,192 @@ func main() {

return s.Run(ctx)
})
g.Go(func() error {
nodes := internal.NewMultiplex()
if cfg.Notifications.Url != "" {
nodesOut := nodes.Out()
g.Go(func() error { return nclient.Stream(ctx, nodesOut) })
}

nodesIn := nodes.In()
g.Go(func() error { return nodes.Do(ctx) })
wg := sync.WaitGroup{}

s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode)
return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(nodesIn)))
})
wg.Add(1)
g.Go(func() error {
pods := internal.NewMultiplex()
deletedPodUuids := internal.NewMultiplex()
s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
podsOut := pods.Out()
g.Go(func() error { return nclient.Stream(ctx, podsOut) })
forwardForNotifications = append(
forwardForNotifications,
syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Nodes().UpsertEvents().In())),
syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Nodes().DeleteEvents().In())),
)
}

schemav1.SyncContainers(ctx, db, g, pods.Out(), deletedPodUuids.Out())
wg.Done()

return s.Run(ctx, forwardForNotifications...)
})

wg.Add(1)
g.Go(func() error {
schemav1.SyncContainers(
ctx,
db,
g,
cachev1.Multiplexers().Pods().UpsertEvents().Out(),
cachev1.Multiplexers().Pods().DeleteEvents().Out(),
)

f := schemav1.NewPodFactory(clientset)
s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New)

podsIn := pods.In()
deletedIn := deletedPodUuids.In()
wg.Done()

g.Go(func() error { return pods.Do(ctx) })
g.Go(func() error { return deletedPodUuids.Do(ctx) })

return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(podsIn)), sync.WithOnDelete(com.ForwardBulk(deletedIn)))
return s.Run(
ctx,
syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Pods().UpsertEvents().In())),
syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Pods().DeleteEvents().In())),
)
})

wg.Add(1)
g.Go(func() error {
deployments := internal.NewMultiplex()
s := syncv1.NewSync(
db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
deploymentsOut := deployments.Out()
g.Go(func() error { return nclient.Stream(ctx, deploymentsOut) })
forwardForNotifications = append(
forwardForNotifications,
syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Deployments().UpsertEvents().In())),
syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Deployments().DeleteEvents().In())),
)
}
s := syncv1.NewSync(db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment)

deploymentsIn := deployments.In()
g.Go(func() error { return deployments.Do(ctx) })
wg.Done()

return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(deploymentsIn)))
return s.Run(ctx, forwardForNotifications...)
})

wg.Add(1)
g.Go(func() error {
daemonSet := internal.NewMultiplex()
s := syncv1.NewSync(
db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
daemonSetOut := daemonSet.Out()
g.Go(func() error { return nclient.Stream(ctx, daemonSetOut) })
forwardForNotifications = append(
forwardForNotifications,
syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().DaemonSets().UpsertEvents().In())),
syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().DaemonSets().DeleteEvents().In())),
)
}

daemonSetIn := daemonSet.In()
g.Go(func() error { return daemonSet.Do(ctx) })
wg.Done()

s := syncv1.NewSync(db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet)

return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(daemonSetIn)))
return s.Run(ctx, forwardForNotifications...)
})

wg.Add(1)
g.Go(func() error {
replicaSet := internal.NewMultiplex()
s := syncv1.NewSync(
db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
replicaSetOut := replicaSet.Out()
g.Go(func() error { return nclient.Stream(ctx, replicaSetOut) })
forwardForNotifications = append(
forwardForNotifications,
syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().ReplicaSets().UpsertEvents().In())),
syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().ReplicaSets().DeleteEvents().In())),
)
}

replicaSetIn := replicaSet.In()
g.Go(func() error { return replicaSet.Do(ctx) })

s := syncv1.NewSync(db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet)
wg.Done()

return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(replicaSetIn)))
return s.Run(ctx, forwardForNotifications...)
})

wg.Add(1)
g.Go(func() error {
statefulSet := internal.NewMultiplex()
s := syncv1.NewSync(
db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet)

var forwardForNotifications []syncv1.Feature
if cfg.Notifications.Url != "" {
statefulSetOut := statefulSet.Out()
g.Go(func() error { return nclient.Stream(ctx, statefulSetOut) })
forwardForNotifications = append(
forwardForNotifications,
syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().StatefulSets().UpsertEvents().In())),
syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().StatefulSets().DeleteEvents().In())),
)
}

statefulSetIn := statefulSet.In()
g.Go(func() error { return statefulSet.Do(ctx) })

s := syncv1.NewSync(db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet)
wg.Done()

return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(statefulSetIn)))
return s.Run(ctx, forwardForNotifications...)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret)
return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent)

return s.Run(ctx, sync.WithNoDelete(), sync.WithNoWarumup())
return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup())
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob)

return s.Run(ctx)
})

g.Go(func() error {
s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress)

return s.Run(ctx)
})

g.Go(func() error {
wg.Wait()

klog.V(2).Info("Starting multiplexers")

return cachev1.Multiplexers().Run(ctx)
})

g.Go(func() error {
return db.PeriodicCleanup(ctx, database.CleanupStmt{
Table: "event",
Expand Down
20 changes: 9 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ module github.com/icinga/icinga-kubernetes

go 1.22.0

toolchain go1.23.0

require (
github.com/go-co-op/gocron v1.37.0
github.com/go-logr/logr v1.4.2
github.com/go-sql-driver/mysql v1.8.1
github.com/google/uuid v1.6.0
github.com/icinga/icinga-go-library v0.0.0-20240524093614-7048f8f10123
github.com/icinga/icinga-go-library v0.3.2-0.20241118194934-1a19cd696d37
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
github.com/okzk/sdnotify v0.0.0-20240725214427-1c1fdd37c5ac
Expand All @@ -19,7 +17,7 @@ require (
github.com/spf13/pflag v1.0.5
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/sync v0.8.0
golang.org/x/sync v0.9.0
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
Expand All @@ -29,15 +27,16 @@ require (

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/caarlos0/env/v11 v11.2.2 // indirect
github.com/creasty/defaults v1.8.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/fatih/color v1.17.0 // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/goccy/go-yaml v1.12.0 // indirect
github.com/goccy/go-yaml v1.13.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
Expand All @@ -59,13 +58,12 @@ require (
github.com/x448/float16 v0.8.4 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.24.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading

0 comments on commit e4223a9

Please sign in to comment.