diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 65764d5..c1fb5a3 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -302,7 +302,7 @@ func main() { } g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace) + s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace, clusterUuid) return s.Run(ctx) }) @@ -311,7 +311,7 @@ func main() { wg.Add(1) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode) + s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode, clusterUuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -338,7 +338,7 @@ func main() { ) f := schemav1.NewPodFactory(clientset) - s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New) + s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New, clusterUuid) wg.Done() @@ -352,7 +352,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment) + db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment, clusterUuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -371,7 +371,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet) + db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet, clusterUuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -390,7 +390,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet) + db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet, clusterUuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -409,7 +409,7 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet) + db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet, clusterUuid) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { @@ -426,60 +426,60 @@ func main() { }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService) + s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService, clusterUuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice) + s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice, clusterUuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret) + s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret, clusterUuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap) + s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap, clusterUuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent) + s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent, clusterUuid) return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup()) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc) + s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc, clusterUuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume) + s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume, clusterUuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob) + s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob, clusterUuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob) + s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob, clusterUuid) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress) + s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress, clusterUuid) return s.Run(ctx) }) diff --git a/pkg/schema/v1/config_map.go b/pkg/schema/v1/config_map.go index e59f56e..d616eec 100644 --- a/pkg/schema/v1/config_map.go +++ b/pkg/schema/v1/config_map.go @@ -31,8 +31,8 @@ func NewConfigMap() Resource { return &ConfigMap{} } -func (c *ConfigMap) Obtain(k8s kmetav1.Object) { - c.ObtainMeta(k8s) +func (c *ConfigMap) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + c.ObtainMeta(k8s, clusterUuid) configMap := k8s.(*kcorev1.ConfigMap) diff --git a/pkg/schema/v1/contracts.go b/pkg/schema/v1/contracts.go index 6716683..dafb325 100644 --- a/pkg/schema/v1/contracts.go +++ b/pkg/schema/v1/contracts.go @@ -14,11 +14,12 @@ var NameSpaceKubernetes = uuid.MustParse("3f249403-2bb0-428f-8e91-504d1fd7ddb6") type Resource interface { kmetav1.Object - Obtain(k8s kmetav1.Object) + Obtain(k8s kmetav1.Object, clusterUuid types.UUID) } type Meta struct { Uuid types.UUID + ClusterUuid types.UUID Uid ktypes.UID Namespace string Name string @@ -26,8 +27,9 @@ type Meta struct { Created types.UnixMilli } -func (m *Meta) ObtainMeta(k8s kmetav1.Object) { +func (m *Meta) ObtainMeta(k8s kmetav1.Object, clusterUuid types.UUID) { m.Uuid = EnsureUUID(k8s.GetUID()) + m.ClusterUuid = clusterUuid m.Uid = k8s.GetUID() m.Namespace = k8s.GetNamespace() m.Name = k8s.GetName() diff --git a/pkg/schema/v1/cron_job.go b/pkg/schema/v1/cron_job.go index 65cf5b9..871ca86 100644 --- a/pkg/schema/v1/cron_job.go +++ b/pkg/schema/v1/cron_job.go @@ -45,8 +45,8 @@ func NewCronJob() Resource { return &CronJob{} } -func (c *CronJob) Obtain(k8s kmetav1.Object) { - c.ObtainMeta(k8s) +func (c *CronJob) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + c.ObtainMeta(k8s, clusterUuid) cronJob := k8s.(*kbatchv1.CronJob) diff --git a/pkg/schema/v1/daemon_set.go b/pkg/schema/v1/daemon_set.go index 72c2cdd..078d0c9 100644 --- a/pkg/schema/v1/daemon_set.go +++ b/pkg/schema/v1/daemon_set.go @@ -71,8 +71,8 @@ func NewDaemonSet() Resource { return &DaemonSet{} } -func (d *DaemonSet) Obtain(k8s kmetav1.Object) { - d.ObtainMeta(k8s) +func (d *DaemonSet) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + d.ObtainMeta(k8s, clusterUuid) daemonSet := k8s.(*kappsv1.DaemonSet) diff --git a/pkg/schema/v1/deployment.go b/pkg/schema/v1/deployment.go index f79fc5b..b1d126f 100644 --- a/pkg/schema/v1/deployment.go +++ b/pkg/schema/v1/deployment.go @@ -74,8 +74,8 @@ func NewDeployment() Resource { return &Deployment{} } -func (d *Deployment) Obtain(k8s kmetav1.Object) { - d.ObtainMeta(k8s) +func (d *Deployment) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + d.ObtainMeta(k8s, clusterUuid) deployment := k8s.(*kappsv1.Deployment) diff --git a/pkg/schema/v1/endpoint.go b/pkg/schema/v1/endpoint.go index 8b3c69e..52baba7 100644 --- a/pkg/schema/v1/endpoint.go +++ b/pkg/schema/v1/endpoint.go @@ -54,8 +54,8 @@ func NewEndpointSlice() Resource { return &EndpointSlice{} } -func (e *EndpointSlice) Obtain(k8s kmetav1.Object) { - e.ObtainMeta(k8s) +func (e *EndpointSlice) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + e.ObtainMeta(k8s, clusterUuid) endpointSlice := k8s.(*kdiscoveryv1.EndpointSlice) diff --git a/pkg/schema/v1/event.go b/pkg/schema/v1/event.go index 4a8aa47..33358b0 100644 --- a/pkg/schema/v1/event.go +++ b/pkg/schema/v1/event.go @@ -32,8 +32,8 @@ func NewEvent() Resource { return &Event{} } -func (e *Event) Obtain(k8s kmetav1.Object) { - e.ObtainMeta(k8s) +func (e *Event) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + e.ObtainMeta(k8s, clusterUuid) event := k8s.(*keventsv1.Event) diff --git a/pkg/schema/v1/ingress.go b/pkg/schema/v1/ingress.go index 3cca870..c6c179f 100644 --- a/pkg/schema/v1/ingress.go +++ b/pkg/schema/v1/ingress.go @@ -57,8 +57,8 @@ func NewIngress() Resource { return &Ingress{} } -func (i *Ingress) Obtain(k8s kmetav1.Object) { - i.ObtainMeta(k8s) +func (i *Ingress) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + i.ObtainMeta(k8s, clusterUuid) ingress := k8s.(*networkingv1.Ingress) diff --git a/pkg/schema/v1/job.go b/pkg/schema/v1/job.go index 14c02ee..806fd82 100644 --- a/pkg/schema/v1/job.go +++ b/pkg/schema/v1/job.go @@ -75,8 +75,8 @@ func NewJob() Resource { return &Job{} } -func (j *Job) Obtain(k8s kmetav1.Object) { - j.ObtainMeta(k8s) +func (j *Job) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + j.ObtainMeta(k8s, clusterUuid) job := k8s.(*kbatchv1.Job) diff --git a/pkg/schema/v1/namespace.go b/pkg/schema/v1/namespace.go index 179575b..4c7a820 100644 --- a/pkg/schema/v1/namespace.go +++ b/pkg/schema/v1/namespace.go @@ -45,8 +45,8 @@ func NewNamespace() Resource { return &Namespace{} } -func (n *Namespace) Obtain(k8s kmetav1.Object) { - n.ObtainMeta(k8s) +func (n *Namespace) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + n.ObtainMeta(k8s, clusterUuid) namespace := k8s.(*kcorev1.Namespace) diff --git a/pkg/schema/v1/node.go b/pkg/schema/v1/node.go index 3b99256..f4a09d4 100644 --- a/pkg/schema/v1/node.go +++ b/pkg/schema/v1/node.go @@ -81,8 +81,8 @@ func NewNode() Resource { return &Node{} } -func (n *Node) Obtain(k8s kmetav1.Object) { - n.ObtainMeta(k8s) +func (n *Node) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + n.ObtainMeta(k8s, clusterUuid) node := k8s.(*kcorev1.Node) diff --git a/pkg/schema/v1/persistent_volume.go b/pkg/schema/v1/persistent_volume.go index 2a30dcf..02b6f26 100644 --- a/pkg/schema/v1/persistent_volume.go +++ b/pkg/schema/v1/persistent_volume.go @@ -39,8 +39,8 @@ func NewPersistentVolume() Resource { return &PersistentVolume{} } -func (p *PersistentVolume) Obtain(k8s kmetav1.Object) { - p.ObtainMeta(k8s) +func (p *PersistentVolume) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + p.ObtainMeta(k8s, clusterUuid) persistentVolume := k8s.(*kcorev1.PersistentVolume) diff --git a/pkg/schema/v1/pod.go b/pkg/schema/v1/pod.go index 9312863..988301b 100644 --- a/pkg/schema/v1/pod.go +++ b/pkg/schema/v1/pod.go @@ -115,8 +115,8 @@ func (f *PodFactory) New() Resource { return &Pod{factory: f} } -func (p *Pod) Obtain(k8s kmetav1.Object) { - p.ObtainMeta(k8s) +func (p *Pod) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + p.ObtainMeta(k8s, clusterUuid) pod := k8s.(*kcorev1.Pod) diff --git a/pkg/schema/v1/pvc.go b/pkg/schema/v1/pvc.go index fb2da18..8aa9293 100644 --- a/pkg/schema/v1/pvc.go +++ b/pkg/schema/v1/pvc.go @@ -76,8 +76,8 @@ func NewPvc() Resource { return &Pvc{} } -func (p *Pvc) Obtain(k8s kmetav1.Object) { - p.ObtainMeta(k8s) +func (p *Pvc) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + p.ObtainMeta(k8s, clusterUuid) pvc := k8s.(*kcorev1.PersistentVolumeClaim) diff --git a/pkg/schema/v1/replica_set.go b/pkg/schema/v1/replica_set.go index 4876536..26d848a 100644 --- a/pkg/schema/v1/replica_set.go +++ b/pkg/schema/v1/replica_set.go @@ -69,8 +69,8 @@ func NewReplicaSet() Resource { return &ReplicaSet{} } -func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { - r.ObtainMeta(k8s) +func (r *ReplicaSet) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + r.ObtainMeta(k8s, clusterUuid) replicaSet := k8s.(*kappsv1.ReplicaSet) diff --git a/pkg/schema/v1/secret.go b/pkg/schema/v1/secret.go index 18ecb3a..a536b6d 100644 --- a/pkg/schema/v1/secret.go +++ b/pkg/schema/v1/secret.go @@ -32,8 +32,8 @@ func NewSecret() Resource { return &Secret{} } -func (s *Secret) Obtain(k8s kmetav1.Object) { - s.ObtainMeta(k8s) +func (s *Secret) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + s.ObtainMeta(k8s, clusterUuid) secret := k8s.(*kcorev1.Secret) diff --git a/pkg/schema/v1/service.go b/pkg/schema/v1/service.go index c5ed3e2..29064cf 100644 --- a/pkg/schema/v1/service.go +++ b/pkg/schema/v1/service.go @@ -79,8 +79,8 @@ func NewService() Resource { return &Service{} } -func (s *Service) Obtain(k8s kmetav1.Object) { - s.ObtainMeta(k8s) +func (s *Service) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + s.ObtainMeta(k8s, clusterUuid) service := k8s.(*kcorev1.Service) diff --git a/pkg/schema/v1/stateful_set.go b/pkg/schema/v1/stateful_set.go index 10ea0d3..4c7bd0e 100644 --- a/pkg/schema/v1/stateful_set.go +++ b/pkg/schema/v1/stateful_set.go @@ -75,8 +75,8 @@ func NewStatefulSet() Resource { return &StatefulSet{} } -func (s *StatefulSet) Obtain(k8s kmetav1.Object) { - s.ObtainMeta(k8s) +func (s *StatefulSet) Obtain(k8s kmetav1.Object, clusterUuid types.UUID) { + s.ObtainMeta(k8s, clusterUuid) statefulSet := k8s.(*kappsv1.StatefulSet) diff --git a/pkg/sync/v1/sync.go b/pkg/sync/v1/sync.go index f1818cf..d84e432 100644 --- a/pkg/sync/v1/sync.go +++ b/pkg/sync/v1/sync.go @@ -3,6 +3,7 @@ package v1 import ( "context" "github.com/go-logr/logr" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/database" schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" @@ -12,10 +13,11 @@ import ( ) type Sync struct { - db *database.Database - informer cache.SharedIndexInformer - log logr.Logger - factory func() schemav1.Resource + db *database.Database + informer cache.SharedIndexInformer + log logr.Logger + factory func() schemav1.Resource + clusterUuid types.UUID } func NewSync( @@ -23,12 +25,14 @@ func NewSync( informer cache.SharedIndexInformer, log logr.Logger, factory func() schemav1.Resource, + clusterUuid types.UUID, ) *Sync { return &Sync{ - db: db, - informer: informer, - log: log, - factory: factory, + db: db, + informer: informer, + log: log, + factory: factory, + clusterUuid: clusterUuid, } } @@ -80,7 +84,7 @@ func (s *Sync) warmup(ctx context.Context, c *Controller) error { func (s *Sync) sync(ctx context.Context, c *Controller, features ...Feature) error { sink := NewSink(func(i *Item) interface{} { entity := s.factory() - entity.Obtain(*i.Item) + entity.Obtain(*i.Item, s.clusterUuid) return entity }, func(k interface{}) interface{} {