diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index ac4ab663..d9e2cd87 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -5,21 +5,20 @@ import ( "flag" _ "github.com/go-sql-driver/mysql" "github.com/google/uuid" + "github.com/icinga/icinga-go-library/backoff" "github.com/icinga/icinga-go-library/config" - igldatabase "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-go-library/periodic" + "github.com/icinga/icinga-go-library/retry" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/internal" cachev1 "github.com/icinga/icinga-kubernetes/internal/cache/v1" - "github.com/icinga/icinga-kubernetes/pkg/backoff" "github.com/icinga/icinga-kubernetes/pkg/cluster" - "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/daemon" - "github.com/icinga/icinga-kubernetes/pkg/database" + kdatabase "github.com/icinga/icinga-kubernetes/pkg/database" "github.com/icinga/icinga-kubernetes/pkg/metrics" "github.com/icinga/icinga-kubernetes/pkg/notifications" - "github.com/icinga/icinga-kubernetes/pkg/retry" schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" syncv1 "github.com/icinga/icinga-kubernetes/pkg/sync/v1" k8sMysql "github.com/icinga/icinga-kubernetes/schema/mysql" @@ -71,7 +70,7 @@ func main() { pflag.Parse() if showVersion { - internal.Version.Print() + internal.Version.Print("Icinga Kubernetes") os.Exit(0) } @@ -103,7 +102,7 @@ func main() { } dbLog := log.WithName("database") - db, err := database.NewFromConfig(&cfg.Database, dbLog) + kdb, err := kdatabase.NewFromConfig(&cfg.Database, dbLog) if err != nil { klog.Fatal(err) } @@ -113,11 +112,11 @@ func main() { // we need to tell systemd, that Icinga for Kubernetes finished starting up. _ = sdnotify.Ready() - if !db.Connect() { + if !kdb.Connect() { return } - hasSchema, err := dbHasSchema(db, cfg.Database.Database) + hasSchema, err := dbHasSchema(kdb, cfg.Database.Database) if err != nil { klog.Fatal(err) } @@ -131,9 +130,9 @@ func main() { ctx, func(ctx context.Context) (err error) { query := "SELECT version FROM kubernetes_schema ORDER BY id DESC LIMIT 1" - err = db.QueryRowxContext(ctx, query).Scan(&version) + err = kdb.QueryRowxContext(ctx, query).Scan(&version) if err != nil { - err = database.CantPerformQuery(err, query) + err = kdatabase.CantPerformQuery(err, query) } return }, @@ -148,8 +147,8 @@ func main() { err = retry.WithBackoff( ctx, func(ctx context.Context) (err error) { - rows, err := db.Query( - db.Rebind("SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=?"), + rows, err := kdb.Query( + kdb.Rebind("SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=?"), cfg.Database.Database, ) if err != nil { @@ -165,7 +164,7 @@ func main() { klog.Fatal(err) } - _, err := db.Exec("DROP TABLE " + tableName) + _, err := kdb.Exec("DROP TABLE " + tableName) if err != nil { klog.Fatal(err) } @@ -188,7 +187,7 @@ func main() { for _, ddl := range strings.Split(k8sMysql.Schema, ";") { if ddl = strings.TrimSpace(ddl); ddl != "" { - if _, err := db.Exec(ddl); err != nil { + if _, err := kdb.Exec(ddl); err != nil { klog.Fatal(err) } } @@ -200,7 +199,7 @@ func main() { klog.Fatal(errors.Wrap(err, "can't configure logging")) } - db2, err := igldatabase.NewDbFromConfig(&cfg.Database, logs.GetChildLogger("database"), igldatabase.RetryConnectorCallbacks{}) + db, err := database.NewDbFromConfig(&cfg.Database, logs.GetChildLogger("database"), database.RetryConnectorCallbacks{}) if err != nil { klog.Fatal("IGL_DATABASE: ", err) } @@ -218,12 +217,12 @@ func main() { ctx = cluster.NewClusterUuidContext(ctx, clusterInstance.Uuid) - stmt, _ := db.BuildUpsertStmt(clusterInstance) - if _, err := db.NamedExecContext(ctx, stmt, clusterInstance); err != nil { + stmt, _ := kdb.BuildUpsertStmt(clusterInstance) + if _, err := kdb.NamedExecContext(ctx, stmt, clusterInstance); err != nil { klog.Error(errors.Wrap(err, "can't update cluster")) } - if _, err := db.ExecContext(ctx, "DELETE FROM kubernetes_instance WHERE cluster_uuid = ?", clusterInstance.Uuid); err != nil { + if _, err := kdb.ExecContext(ctx, "DELETE FROM kubernetes_instance WHERE cluster_uuid = ?", clusterInstance.Uuid); err != nil { klog.Fatal(errors.Wrap(err, "can't delete instance")) } // ,omitempty @@ -251,14 +250,14 @@ func main() { Heartbeat: types.UnixMilli(tick.Time), } - stmt, _ := db.BuildUpsertStmt(instance) + stmt, _ := kdb.BuildUpsertStmt(instance) - if _, err := db.NamedExecContext(ctx, stmt, instance); err != nil { + if _, err := kdb.NamedExecContext(ctx, stmt, instance); err != nil { klog.Error(errors.Wrap(err, "can't update instance")) } }, periodic.Immediate()).Stop() - if err := internal.SyncNotificationsConfig(ctx, db2, &cfg.Notifications, clusterInstance.Uuid); err != nil { + if err := internal.SyncNotificationsConfig(ctx, db, &cfg.Notifications, clusterInstance.Uuid); err != nil { klog.Fatal(err) } @@ -296,7 +295,7 @@ func main() { } g.Go(func() error { - return SyncServicePods(ctx, db, factory.Core().V1().Services(), factory.Core().V1().Pods()) + return SyncServicePods(ctx, kdb, factory.Core().V1().Services(), factory.Core().V1().Pods()) }) if cfg.Prometheus.Url != "" { @@ -306,7 +305,7 @@ func main() { } promApiClient := promv1.NewAPI(promClient) - promMetricSync := metrics.NewPromMetricSync(promApiClient, db2, logs.GetChildLogger("prometheus")) + promMetricSync := metrics.NewPromMetricSync(promApiClient, db, logs.GetChildLogger("prometheus")) g.Go(func() error { return promMetricSync.Nodes(ctx, factory.Core().V1().Nodes().Informer()) @@ -318,7 +317,7 @@ func main() { } g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace) + s := syncv1.NewSync(kdb, factory.Core().V1().Namespaces().Informer(), log.WithName("namespaces"), schemav1.NewNamespace) return s.Run(ctx) }) @@ -327,14 +326,14 @@ func main() { wg.Add(1) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode) + s := syncv1.NewSync(kdb, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { forwardForNotifications = append( forwardForNotifications, - syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Nodes().UpsertEvents().In())), - syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Nodes().DeleteEvents().In())), + syncv1.WithOnUpsert(database.OnSuccessSendTo(cachev1.Multiplexers().Nodes().UpsertEvents().In())), + syncv1.WithOnDelete(database.OnSuccessSendTo(cachev1.Multiplexers().Nodes().DeleteEvents().In())), ) } @@ -347,35 +346,35 @@ func main() { g.Go(func() error { schemav1.SyncContainers( ctx, - db, + kdb, g, cachev1.Multiplexers().Pods().UpsertEvents().Out(), cachev1.Multiplexers().Pods().DeleteEvents().Out(), ) f := schemav1.NewPodFactory(clientset) - s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New) + s := syncv1.NewSync(kdb, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New) wg.Done() return s.Run( ctx, - syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Pods().UpsertEvents().In())), - syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Pods().DeleteEvents().In())), + syncv1.WithOnUpsert(database.OnSuccessSendTo(cachev1.Multiplexers().Pods().UpsertEvents().In())), + syncv1.WithOnDelete(database.OnSuccessSendTo(cachev1.Multiplexers().Pods().DeleteEvents().In())), ) }) wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment) + kdb, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { forwardForNotifications = append( forwardForNotifications, - syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Deployments().UpsertEvents().In())), - syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Deployments().DeleteEvents().In())), + syncv1.WithOnUpsert(database.OnSuccessSendTo(cachev1.Multiplexers().Deployments().UpsertEvents().In())), + syncv1.WithOnDelete(database.OnSuccessSendTo(cachev1.Multiplexers().Deployments().DeleteEvents().In())), ) } @@ -387,14 +386,14 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet) + kdb, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { forwardForNotifications = append( forwardForNotifications, - syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().DaemonSets().UpsertEvents().In())), - syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().DaemonSets().DeleteEvents().In())), + syncv1.WithOnUpsert(database.OnSuccessSendTo(cachev1.Multiplexers().DaemonSets().UpsertEvents().In())), + syncv1.WithOnDelete(database.OnSuccessSendTo(cachev1.Multiplexers().DaemonSets().DeleteEvents().In())), ) } @@ -406,14 +405,14 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet) + kdb, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { forwardForNotifications = append( forwardForNotifications, - syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().ReplicaSets().UpsertEvents().In())), - syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().ReplicaSets().DeleteEvents().In())), + syncv1.WithOnUpsert(database.OnSuccessSendTo(cachev1.Multiplexers().ReplicaSets().UpsertEvents().In())), + syncv1.WithOnDelete(database.OnSuccessSendTo(cachev1.Multiplexers().ReplicaSets().DeleteEvents().In())), ) } @@ -425,14 +424,14 @@ func main() { wg.Add(1) g.Go(func() error { s := syncv1.NewSync( - db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet) + kdb, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet) var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { forwardForNotifications = append( forwardForNotifications, - syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().StatefulSets().UpsertEvents().In())), - syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().StatefulSets().DeleteEvents().In())), + syncv1.WithOnUpsert(database.OnSuccessSendTo(cachev1.Multiplexers().StatefulSets().UpsertEvents().In())), + syncv1.WithOnDelete(database.OnSuccessSendTo(cachev1.Multiplexers().StatefulSets().DeleteEvents().In())), ) } @@ -443,63 +442,63 @@ func main() { g.Go(func() error { f := schemav1.NewServiceFactory(clientset) - s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), f.NewService) + s := syncv1.NewSync(kdb, factory.Core().V1().Services().Informer(), log.WithName("services"), f.NewService) return s.Run( ctx, - syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Services().UpsertEvents().In())), + syncv1.WithOnUpsert(database.OnSuccessSendTo(cachev1.Multiplexers().Services().UpsertEvents().In())), ) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice) + s := syncv1.NewSync(kdb, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret) + s := syncv1.NewSync(kdb, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap) + s := syncv1.NewSync(kdb, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent) + s := syncv1.NewSync(kdb, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent) return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup()) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc) + s := syncv1.NewSync(kdb, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume) + s := syncv1.NewSync(kdb, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob) + s := syncv1.NewSync(kdb, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob) + s := syncv1.NewSync(kdb, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob) return s.Run(ctx) }) g.Go(func() error { - s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress) + s := syncv1.NewSync(kdb, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress) return s.Run(ctx) }) @@ -513,7 +512,7 @@ func main() { }) g.Go(func() error { - return db.PeriodicCleanup(ctx, database.CleanupStmt{ + return kdb.PeriodicCleanup(ctx, kdatabase.CleanupStmt{ Table: "event", PK: "uuid", Column: "created", @@ -521,7 +520,7 @@ func main() { }) g.Go(func() error { - return db.PeriodicCleanup(ctx, database.CleanupStmt{ + return kdb.PeriodicCleanup(ctx, kdatabase.CleanupStmt{ Table: "prometheus_cluster_metric", PK: "(cluster_uuid, timestamp, category, name)", Column: "timestamp", @@ -529,7 +528,7 @@ func main() { }) g.Go(func() error { - return db.PeriodicCleanup(ctx, database.CleanupStmt{ + return kdb.PeriodicCleanup(ctx, kdatabase.CleanupStmt{ Table: "prometheus_node_metric", PK: "(node_uuid, timestamp, category, name)", Column: "timestamp", @@ -537,7 +536,7 @@ func main() { }) g.Go(func() error { - return db.PeriodicCleanup(ctx, database.CleanupStmt{ + return kdb.PeriodicCleanup(ctx, kdatabase.CleanupStmt{ Table: "prometheus_pod_metric", PK: "(pod_uuid, timestamp, category, name)", Column: "timestamp", @@ -545,7 +544,7 @@ func main() { }) g.Go(func() error { - return db.PeriodicCleanup(ctx, database.CleanupStmt{ + return kdb.PeriodicCleanup(ctx, kdatabase.CleanupStmt{ Table: "prometheus_container_metric", PK: "(container_uuid, timestamp, category, name)", Column: "timestamp", @@ -558,7 +557,7 @@ func main() { } // dbHasSchema queries via db whether the database dbName has a table named "kubernetes_schema". -func dbHasSchema(db *database.Database, dbName string) (bool, error) { +func dbHasSchema(db *kdatabase.Database, dbName string) (bool, error) { rows, err := db.Query( db.Rebind("SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=? AND TABLE_NAME='kubernetes_schema'"), dbName, @@ -572,7 +571,7 @@ func dbHasSchema(db *database.Database, dbName string) (bool, error) { return rows.Next(), rows.Err() } -func SyncServicePods(ctx context.Context, db *database.Database, serviceList v2.ServiceInformer, podList v2.PodInformer) error { +func SyncServicePods(ctx context.Context, db *kdatabase.Database, serviceList v2.ServiceInformer, podList v2.PodInformer) error { servicePods := make(chan any) g, ctx := errgroup.WithContext(ctx) diff --git a/internal/version.go b/internal/version.go index 0bddead7..43e246bb 100644 --- a/internal/version.go +++ b/internal/version.go @@ -1,10 +1,10 @@ package internal import ( - "github.com/icinga/icinga-kubernetes/pkg/version" + "github.com/icinga/icinga-go-library/version" ) // Version contains version and Git commit information. // // The placeholders are replaced on `git archive` using the `export-subst` attribute. -var Version = version.Version("Icinga Kubernetes", "0.2.0", "$Format:%(describe)$", "$Format:%H$") +var Version = version.Version("0.2.0", "$Format:%(describe)$", "$Format:%H$") diff --git a/pkg/backoff/backoff.go b/pkg/backoff/backoff.go deleted file mode 100644 index 6ce7beef..00000000 --- a/pkg/backoff/backoff.go +++ /dev/null @@ -1,43 +0,0 @@ -package backoff - -import ( - "math/rand" - "time" -) - -// Backoff returns the backoff duration for a specific retry attempt. -type Backoff func(uint64) time.Duration - -// NewExponentialWithJitter returns a backoff implementation that -// exponentially increases the backoff duration for each retry from min, -// never exceeding max. Some randomization is added to the backoff duration. -// It panics if min >= max. -func NewExponentialWithJitter(min, max time.Duration) Backoff { - if min <= 0 { - min = time.Millisecond * 100 - } - if max <= 0 { - max = time.Second * 10 - } - if min >= max { - panic("max must be larger than min") - } - - return func(attempt uint64) time.Duration { - e := min << attempt - if e <= 0 || e > max { - e = max - } - - return time.Duration(jitter(int64(e))) - } -} - -// jitter returns a random integer distributed in the range [n/2..n). -func jitter(n int64) int64 { - if n == 0 { - return 0 - } - - return n/2 + rand.Int63n(n/2) -} diff --git a/pkg/com/bulker.go b/pkg/com/bulker.go deleted file mode 100644 index eee6126a..00000000 --- a/pkg/com/bulker.go +++ /dev/null @@ -1,202 +0,0 @@ -package com - -import ( - "context" - "golang.org/x/sync/errgroup" - "sync" - "time" -) - -// BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles. -// A call takes an item for the current chunk into account. -// Output true indicates that the state machine was reset first and the bulker -// shall finish the current chunk now (not e.g. once $size is reached) without the given item. -type BulkChunkSplitPolicy[T any] func(T) bool - -type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T] - -// NeverSplit returns a pseudo state machine which never demands splitting. -func NeverSplit[T any]() BulkChunkSplitPolicy[T] { - return neverSplit[T] -} - -// // SplitOnDupId returns a state machine which tracks the inputs' IDs. -// // Once an already seen input arrives, it demands splitting. -// func SplitOnDupId[T contracts.IDer]() BulkChunkSplitPolicy[T] { -// seenIds := map[string]struct{}{} -// -// return func(ider T) bool { -// id := ider.ID().String() -// -// _, ok := seenIds[id] -// if ok { -// seenIds = map[string]struct{}{id: {}} -// } else { -// seenIds[id] = struct{}{} -// } -// -// return ok -// } -// } - -func neverSplit[T any](T) bool { - return false -} - -// Bulker reads all values from a channel and streams them in chunks into a Bulk channel. -type Bulker[T any] struct { - ch chan []T - ctx context.Context - mu sync.Mutex -} - -// NewBulker returns a new Bulker and starts streaming. -func NewBulker[T any]( - ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], -) *Bulker[T] { - b := &Bulker[T]{ - ch: make(chan []T), - ctx: ctx, - mu: sync.Mutex{}, - } - - go b.run(ch, count, splitPolicyFactory) - - return b -} - -// Bulk returns the channel on which the bulks are delivered. -func (b *Bulker[T]) Bulk() <-chan []T { - return b.ch -} - -func (b *Bulker[T]) run(ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T]) { - defer close(b.ch) - - bufCh := make(chan T, count) - splitPolicy := splitPolicyFactory() - g, ctx := errgroup.WithContext(b.ctx) - - g.Go(func() error { - defer close(bufCh) - - for { - select { - case v, ok := <-ch: - if !ok { - return nil - } - - bufCh <- v - case <-ctx.Done(): - return ctx.Err() - } - } - }) - - g.Go(func() error { - for done := false; !done; { - buf := make([]T, 0, count) - timeout := time.After(256 * time.Millisecond) - - for drain := true; drain && len(buf) < count; { - select { - case v, ok := <-bufCh: - if !ok { - drain = false - done = true - - break - } - - if splitPolicy(v) { - if len(buf) > 0 { - b.ch <- buf - buf = make([]T, 0, count) - } - - timeout = time.After(256 * time.Millisecond) - } - - buf = append(buf, v) - case <-timeout: - drain = false - case <-ctx.Done(): - return ctx.Err() - } - } - - if len(buf) > 0 { - b.ch <- buf - } - - splitPolicy = splitPolicyFactory() - } - - return nil - }) - - // We don't expect an error here. - // We only use errgroup for the encapsulated use of sync.WaitGroup. - _ = g.Wait() -} - -// Bulk reads all values from a channel and streams them in chunks into a returned channel. -func Bulk[T any]( - ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T], -) <-chan []T { - if count <= 1 { - return oneBulk(ctx, ch) - } - - return NewBulker(ctx, ch, count, splitPolicyFactory).Bulk() -} - -// oneBulk operates just as NewBulker(ctx, ch, 1, splitPolicy).Bulk(), -// but without the overhead of the actual bulk creation with a buffer channel, timeout and BulkChunkSplitPolicy. -func oneBulk[T any](ctx context.Context, ch <-chan T) <-chan []T { - out := make(chan []T) - go func() { - defer close(out) - - for { - select { - case item, ok := <-ch: - if !ok { - return - } - - select { - case out <- []T{item}: - case <-ctx.Done(): - return - } - case <-ctx.Done(): - return - } - } - }() - - return out -} - -type ProcessBulk[T any] func(ctx context.Context, bulk []T) (err error) - -func ForwardBulk[T any](ch chan<- T) ProcessBulk[T] { - return func(ctx context.Context, rows []T) error { - for _, row := range rows { - select { - case ch <- row: - case <-ctx.Done(): - return ctx.Err() - } - } - - return nil - } -} - -var ( - _ BulkChunkSplitPolicyFactory[struct{}] = NeverSplit[struct{}] - // _ BulkChunkSplitPolicyFactory[contracts.Entity] = SplitOnDupId[contracts.Entity] -) diff --git a/pkg/com/com.go b/pkg/com/com.go deleted file mode 100644 index fcb3e8b7..00000000 --- a/pkg/com/com.go +++ /dev/null @@ -1,88 +0,0 @@ -package com - -import ( - "context" - "golang.org/x/sync/errgroup" -) - -// WaitAsync calls Wait() on the passed Waiter in a new goroutine and -// sends the first non-nil error (if any) to the returned channel. -// The returned channel is always closed when the Waiter is done. -func WaitAsync(ctx context.Context, w Waiter) <-chan error { - errs := make(chan error, 1) - - go func() { - defer close(errs) - - if e := w.Wait(); e != nil { - select { - case errs <- e: - case <-ctx.Done(): - } - - } - }() - - return errs -} - -// ErrgroupReceive adds a goroutine to the specified group that -// returns the first non-nil error (if any) from the specified channel. -// If the channel is closed, it will return nil. -func ErrgroupReceive(ctx context.Context, g *errgroup.Group, err <-chan error) { - g.Go(func() error { - select { - case e := <-err: - return e - case <-ctx.Done(): - return ctx.Err() - } - }) -} - -// CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item. -func CopyFirst[T any]( - ctx context.Context, input <-chan T, -) (first T, forward <-chan T, err error) { - var ok bool - select { - case <-ctx.Done(): - err = ctx.Err() - - return - case first, ok = <-input: - } - - if !ok { - return - } - - // Buffer of one because we receive an item and send it back immediately. - fwd := make(chan T, 1) - fwd <- first - - forward = fwd - - go func() { - defer close(fwd) - - for { - select { - case i, more := <-input: - if !more { - return - } - - select { - case fwd <- i: - case <-ctx.Done(): - return - } - case <-ctx.Done(): - return - } - } - }() - - return -} diff --git a/pkg/com/contracts.go b/pkg/com/contracts.go deleted file mode 100644 index 6559de31..00000000 --- a/pkg/com/contracts.go +++ /dev/null @@ -1,7 +0,0 @@ -package com - -// Waiter implements the Wait method, -// which blocks until execution is complete. -type Waiter interface { - Wait() error // Wait waits for execution to complete. -} diff --git a/pkg/com/counter.go b/pkg/com/counter.go deleted file mode 100644 index 52f9f7ff..00000000 --- a/pkg/com/counter.go +++ /dev/null @@ -1,48 +0,0 @@ -package com - -import ( - "sync" - "sync/atomic" -) - -// Counter implements an atomic counter. -type Counter struct { - value uint64 - mu sync.Mutex // Protects total. - total uint64 -} - -// Add adds the given delta to the counter. -func (c *Counter) Add(delta uint64) { - atomic.AddUint64(&c.value, delta) -} - -// Inc increments the counter by one. -func (c *Counter) Inc() { - c.Add(1) -} - -// Reset resets the counter to 0 and returns its previous value. -// Does not reset the total value returned from Total. -func (c *Counter) Reset() uint64 { - c.mu.Lock() - defer c.mu.Unlock() - - v := atomic.SwapUint64(&c.value, 0) - c.total += v - - return v -} - -// Total returns the total counter value. -func (c *Counter) Total() uint64 { - c.mu.Lock() - defer c.mu.Unlock() - - return c.total + c.Val() -} - -// Val returns the current counter value. -func (c *Counter) Val() uint64 { - return atomic.LoadUint64(&c.value) -} diff --git a/pkg/database/cleanup.go b/pkg/database/cleanup.go index 8194c789..6da16398 100644 --- a/pkg/database/cleanup.go +++ b/pkg/database/cleanup.go @@ -4,11 +4,10 @@ import ( "context" "fmt" "github.com/icinga/icinga-go-library/backoff" + "github.com/icinga/icinga-go-library/com" + "github.com/icinga/icinga-go-library/periodic" "github.com/icinga/icinga-go-library/retry" "github.com/icinga/icinga-go-library/types" - "github.com/icinga/icinga-kubernetes/pkg/com" - "github.com/icinga/icinga-kubernetes/pkg/periodic" - "golang.org/x/sync/errgroup" "time" ) @@ -107,8 +106,6 @@ type cleanupWhere struct { } func (db *Database) PeriodicCleanup(ctx context.Context, stmt CleanupStmt) error { - g, ctxCleanup := errgroup.WithContext(ctx) - errs := make(chan error, 1) defer close(errs) @@ -129,7 +126,10 @@ func (db *Database) PeriodicCleanup(ctx context.Context, stmt CleanupStmt) error } }, periodic.Immediate()).Stop() - com.ErrgroupReceive(ctxCleanup, g, errs) - - return g.Wait() + select { + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + } } diff --git a/pkg/database/database.go b/pkg/database/database.go index fd64aa9a..d0be7674 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -5,12 +5,12 @@ import ( "fmt" "github.com/go-logr/logr" "github.com/go-sql-driver/mysql" + "github.com/icinga/icinga-go-library/backoff" + "github.com/icinga/icinga-go-library/com" "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/periodic" + "github.com/icinga/icinga-go-library/retry" "github.com/icinga/icinga-go-library/strcase" - "github.com/icinga/icinga-kubernetes/pkg/backoff" - "github.com/icinga/icinga-kubernetes/pkg/com" - "github.com/icinga/icinga-kubernetes/pkg/periodic" - "github.com/icinga/icinga-kubernetes/pkg/retry" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" "github.com/pkg/errors" @@ -634,7 +634,7 @@ func (db *Database) YieldAll(ctx context.Context, factoryFunc func() (interface{ return nil }) - return entities, com.WaitAsync(ctx, g) + return entities, com.WaitAsync(g) } func (db *Database) periodicLog(ctx context.Context, query string, counter *com.Counter) periodic.Stopper { diff --git a/pkg/database/driver.go b/pkg/database/driver.go index 1737d118..bbe4f866 100644 --- a/pkg/database/driver.go +++ b/pkg/database/driver.go @@ -7,8 +7,8 @@ import ( "fmt" "github.com/go-logr/logr" "github.com/go-sql-driver/mysql" - "github.com/icinga/icinga-kubernetes/pkg/backoff" - "github.com/icinga/icinga-kubernetes/pkg/retry" + "github.com/icinga/icinga-go-library/backoff" + "github.com/icinga/icinga-go-library/retry" "github.com/jmoiron/sqlx" "github.com/pkg/errors" "time" @@ -38,7 +38,7 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1), retry.Settings{ Timeout: timeout, - OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) { if lastErr == nil || err.Error() != lastErr.Error() { c.driver.Logger.Info("Can't connect to database. Retrying", "error", err) } diff --git a/pkg/database/features.go b/pkg/database/features.go index cafb26a7..991d2c5f 100644 --- a/pkg/database/features.go +++ b/pkg/database/features.go @@ -1,7 +1,7 @@ package database import ( - "github.com/icinga/icinga-kubernetes/pkg/com" + "github.com/icinga/icinga-go-library/database" ) type Feature func(*Features) @@ -9,7 +9,7 @@ type Feature func(*Features) type Features struct { blocking bool cascading bool - onSuccess com.ProcessBulk[any] + onSuccess database.OnSuccess[any] } func NewFeatures(features ...Feature) *Features { @@ -33,7 +33,7 @@ func WithCascading() Feature { } } -func WithOnSuccess(fn com.ProcessBulk[any]) Feature { +func WithOnSuccess(fn database.OnSuccess[any]) Feature { return func(f *Features) { f.onSuccess = fn } diff --git a/pkg/periodic/periodic.go b/pkg/periodic/periodic.go deleted file mode 100644 index 6ef5ceb8..00000000 --- a/pkg/periodic/periodic.go +++ /dev/null @@ -1,123 +0,0 @@ -package periodic - -import ( - "context" - "sync" - "time" -) - -// Option configures Start. -type Option interface { - apply(*periodic) -} - -// Stopper implements the Stop method, -// which stops a periodic task from Start(). -type Stopper interface { - Stop() // Stops a periodic task. -} - -// Tick is the value for periodic task callbacks that -// contains the time of the tick and -// the time elapsed since the start of the periodic task. -type Tick struct { - Elapsed time.Duration - Time time.Time -} - -// Immediate starts the periodic task immediately instead of after the first tick. -func Immediate() Option { - return optionFunc(func(p *periodic) { - p.immediate = true - }) -} - -// OnStop configures a callback that is executed when a periodic task is stopped or canceled. -func OnStop(f func(Tick)) Option { - return optionFunc(func(p *periodic) { - p.onStop = f - }) -} - -// Start starts a periodic task with a ticker at the specified interval, -// which executes the given callback after each tick. -// Pending tasks do not overlap, but could start immediately if -// the previous task(s) takes longer than the interval. -// Call Stop() on the return value in order to stop the ticker and to release associated resources. -// The interval must be greater than zero. -func Start(ctx context.Context, interval time.Duration, callback func(Tick), options ...Option) Stopper { - t := &periodic{ - interval: interval, - callback: callback, - } - - for _, option := range options { - option.apply(t) - } - - ctx, cancelCtx := context.WithCancel(ctx) - - start := time.Now() - - go func() { - done := false - - if !t.immediate { - select { - case <-time.After(interval): - case <-ctx.Done(): - done = true - } - } - - if !done { - ticker := time.NewTicker(t.interval) - defer ticker.Stop() - - for tickTime := time.Now(); !done; { - t.callback(Tick{ - Elapsed: tickTime.Sub(start), - Time: tickTime, - }) - - select { - case tickTime = <-ticker.C: - case <-ctx.Done(): - done = true - } - } - } - - if t.onStop != nil { - now := time.Now() - t.onStop(Tick{ - Elapsed: now.Sub(start), - Time: now, - }) - } - }() - - return stoperFunc(func() { - t.stop.Do(cancelCtx) - }) -} - -type optionFunc func(*periodic) - -func (f optionFunc) apply(p *periodic) { - f(p) -} - -type stoperFunc func() - -func (f stoperFunc) Stop() { - f() -} - -type periodic struct { - interval time.Duration - callback func(Tick) - immediate bool - stop sync.Once - onStop func(Tick) -} diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go deleted file mode 100644 index b96edf37..00000000 --- a/pkg/retry/retry.go +++ /dev/null @@ -1,134 +0,0 @@ -package retry - -import ( - "context" - "github.com/icinga/icinga-kubernetes/pkg/backoff" - "github.com/pkg/errors" - "net" - "syscall" - "time" -) - -// RetryableFunc is a retryable function. -type RetryableFunc func(context.Context) error - -// IsRetryable checks whether a new attempt can be started based on the error passed. -type IsRetryable func(error) bool - -// Settings aggregates optional settings for WithBackoff. -type Settings struct { - // Timeout lets WithBackoff give up once elapsed (if >0). - Timeout time.Duration - // OnError is called if an error occurs. - OnError func(elapsed time.Duration, attempt uint64, err, lastErr error) - // OnSuccess is called once the operation succeeds. - OnSuccess func(elapsed time.Duration, attempt uint64, lastErr error) -} - -// WithBackoff retries the passed function if it fails and the error allows it to retry. -// The specified backoff policy is used to determine how long to sleep between attempts. -func WithBackoff( - ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, settings Settings, -) (err error) { - parentCtx := ctx - - if settings.Timeout > 0 { - var cancelCtx context.CancelFunc - ctx, cancelCtx = context.WithTimeout(ctx, settings.Timeout) - defer cancelCtx() - } - - start := time.Now() - for attempt := uint64(0); ; /* true */ attempt++ { - prevErr := err - - if err = retryableFunc(ctx); err == nil { - if settings.OnSuccess != nil { - settings.OnSuccess(time.Since(start), attempt, prevErr) - } - - return - } - - if settings.OnError != nil { - settings.OnError(time.Since(start), attempt, err, prevErr) - } - - isRetryable := retryable(err) - - if prevErr != nil && (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { - err = prevErr - } - - if !isRetryable { - err = errors.Wrap(err, "can't retry") - - return - } - - sleep := b(attempt) - select { - case <-ctx.Done(): - if outerErr := parentCtx.Err(); outerErr != nil { - err = errors.Wrap(outerErr, "outer context canceled") - } else { - if err == nil { - err = ctx.Err() - } - err = errors.Wrap(err, "can't retry") - } - - return - case <-time.After(sleep): - } - } -} - -// Retryable returns true for common errors that are considered retryable, -// i.e. temporary, timeout, DNS, connection refused and reset, host down and unreachable and -// network down and unreachable errors. -func Retryable(err error) bool { - var temporary interface { - Temporary() bool - } - if errors.As(err, &temporary) && temporary.Temporary() { - return true - } - - var timeout interface { - Timeout() bool - } - if errors.As(err, &timeout) && timeout.Timeout() { - return true - } - - var dnsError *net.DNSError - if errors.As(err, &dnsError) { - return true - } - - var opError *net.OpError - if errors.As(err, &opError) { - // OpError provides Temporary() and Timeout(), but not Unwrap(), - // so we have to extract the underlying error ourselves to also check for ECONNREFUSED, - // which is not considered temporary or timed out by Go. - err = opError.Err - } - if errors.Is(err, syscall.ECONNREFUSED) { - // syscall errors provide Temporary() and Timeout(), - // which do not include ECONNREFUSED, so we check this ourselves. - return true - } - if errors.Is(err, syscall.ECONNRESET) { - // ECONNRESET is treated as a temporary error by Go only if it comes from calling accept. - return true - } - if errors.Is(err, syscall.EHOSTDOWN) || errors.Is(err, syscall.EHOSTUNREACH) { - return true - } - if errors.Is(err, syscall.ENETDOWN) || errors.Is(err, syscall.ENETUNREACH) { - return true - } - - return false -} diff --git a/pkg/schema/v1/container.go b/pkg/schema/v1/container.go index acb78844..527ef225 100644 --- a/pkg/schema/v1/container.go +++ b/pkg/schema/v1/container.go @@ -6,8 +6,8 @@ import ( "errors" "fmt" "github.com/go-co-op/gocron" + "github.com/icinga/icinga-go-library/com" "github.com/icinga/icinga-go-library/types" - "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/database" "golang.org/x/sync/errgroup" "io" @@ -464,7 +464,7 @@ func SyncContainers(ctx context.Context, db *database.Database, g *errgroup.Grou err := make(chan error, 1) err <- warmup(ctx, db) close(err) - com.ErrgroupReceive(ctx, g, err) + com.ErrgroupReceive(g, err) // Use buffered channel here not to block the goroutines, as they can stream container ids // from multiple pods concurrently. @@ -507,7 +507,7 @@ func SyncContainers(ctx context.Context, db *database.Database, g *errgroup.Grou entities, errs := db.YieldAll(ctx, func() (interface{}, error) { return &Container{}, nil }, query, meta) - com.ErrgroupReceive(ctx, g, errs) + com.ErrgroupReceive(g, errs) g.Go(func() error { defer runtime.HandleCrash() @@ -597,7 +597,7 @@ func warmup(ctx context.Context, db *database.Database) error { entities, errs := db.YieldAll(ctx, func() (interface{}, error) { return &ContainerLog{}, nil }, db.BuildSelectStmt(ContainerLog{}, ContainerLog{})) - com.ErrgroupReceive(ctx, g, errs) + com.ErrgroupReceive(g, errs) g.Go(func() error { defer runtime.HandleCrash() diff --git a/pkg/sync/v1/features.go b/pkg/sync/v1/features.go index 73749d91..d0f16083 100644 --- a/pkg/sync/v1/features.go +++ b/pkg/sync/v1/features.go @@ -1,14 +1,14 @@ package v1 -import "github.com/icinga/icinga-kubernetes/pkg/com" +import "github.com/icinga/icinga-go-library/database" type Feature func(*Features) type Features struct { noDelete bool noWarmup bool - onDelete com.ProcessBulk[any] - onUpsert com.ProcessBulk[any] + onDelete database.OnSuccess[any] + onUpsert database.OnSuccess[any] } func NewFeatures(features ...Feature) *Features { @@ -28,11 +28,11 @@ func (f *Features) NoWarmup() bool { return f.noWarmup } -func (f *Features) OnDelete() com.ProcessBulk[any] { +func (f *Features) OnDelete() database.OnSuccess[any] { return f.onDelete } -func (f *Features) OnUpsert() com.ProcessBulk[any] { +func (f *Features) OnUpsert() database.OnSuccess[any] { return f.onUpsert } @@ -48,13 +48,13 @@ func WithNoWarumup() Feature { } } -func WithOnDelete(fn com.ProcessBulk[any]) Feature { +func WithOnDelete(fn database.OnSuccess[any]) Feature { return func(f *Features) { f.onDelete = fn } } -func WithOnUpsert(fn com.ProcessBulk[any]) Feature { +func WithOnUpsert(fn database.OnSuccess[any]) Feature { return func(f *Features) { f.onUpsert = fn } diff --git a/pkg/sync/v1/sync.go b/pkg/sync/v1/sync.go index 0157c9de..c58a1133 100644 --- a/pkg/sync/v1/sync.go +++ b/pkg/sync/v1/sync.go @@ -3,8 +3,8 @@ package v1 import ( "context" "github.com/go-logr/logr" + "github.com/icinga/icinga-go-library/com" "github.com/icinga/icinga-kubernetes/pkg/cluster" - "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/database" schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" "golang.org/x/sync/errgroup" @@ -58,7 +58,7 @@ func (s *Sync) warmup(ctx context.Context, c *Controller) error { }, query, meta) // Let errors from YieldAll() cancel the group. - com.ErrgroupReceive(ctx, g, errs) + com.ErrgroupReceive(g, errs) g.Go(func() error { defer runtime.HandleCrash() diff --git a/pkg/version/version.go b/pkg/version/version.go deleted file mode 100644 index c83d5b1f..00000000 --- a/pkg/version/version.go +++ /dev/null @@ -1,183 +0,0 @@ -package version - -import ( - "bufio" - "errors" - "fmt" - "os" - "runtime" - "runtime/debug" - "strconv" - "strings" -) - -type VersionInfo struct { - Name string - Version string - Commit string -} - -// Version determines version and commit information based on multiple data sources: -// - Version information dynamically added by `git archive` in the remaining to parameters. -// - A hardcoded version number passed as first parameter. -// - Commit information added to the binary by `go build`. -// -// It's supposed to be called like this in combination with setting the `export-subst` attribute for the corresponding -// file in .gitattributes: -// -// var Version = version.Version("acme", 1.0.0-rc2", "$Format:%(describe)$", "$Format:%H$") -// -// When exported using `git archive`, the placeholders are replaced in the file and this version information is -// preferred. Otherwise the hardcoded version is used and augmented with commit information from the build metadata. -func Version(name, version, gitDescribe, gitHash string) *VersionInfo { - const hashLen = 7 // Same truncation length for the commit hash as used by git describe. - - if !strings.HasPrefix(gitDescribe, "$") && !strings.HasPrefix(gitHash, "$") { - if strings.HasPrefix(gitDescribe, "%") { - // Only Git 2.32+ supports %(describe), older versions don't expand it but keep it as-is. - // Fall back to the hardcoded version augmented with the commit hash. - gitDescribe = version - - if len(gitHash) >= hashLen { - gitDescribe += "-g" + gitHash[:hashLen] - } - } - - return &VersionInfo{ - Name: name, - Version: gitDescribe, - Commit: gitHash, - } - } else { - commit := "" - - if info, ok := debug.ReadBuildInfo(); ok { - modified := false - - for _, setting := range info.Settings { - switch setting.Key { - case "vcs.revision": - commit = setting.Value - case "vcs.modified": - modified, _ = strconv.ParseBool(setting.Value) - } - } - - if len(commit) >= hashLen { - version += "-g" + commit[:hashLen] - - if modified { - version += "-dirty" - commit += " (modified)" - } - } - } - - return &VersionInfo{ - Name: name, - Version: version, - Commit: commit, - } - } -} - -// Print writes verbose version output to stdout. -func (v *VersionInfo) Print() { - fmt.Println(v.Name, "version:", v.Version) - fmt.Println() - - fmt.Println("Build information:") - fmt.Printf(" Go version: %s (%s, %s)\n", runtime.Version(), runtime.GOOS, runtime.GOARCH) - if v.Commit != "" { - fmt.Println(" Git commit:", v.Commit) - } - - if r, err := readOsRelease(); err == nil { - fmt.Println() - fmt.Println("System information:") - fmt.Println(" Platform:", r.Name) - fmt.Println(" Platform version:", r.DisplayVersion()) - } -} - -// osRelease contains the information obtained from the os-release file. -type osRelease struct { - Name string - Version string - VersionId string - BuildId string -} - -// DisplayVersion returns the most suitable version information for display purposes. -func (o *osRelease) DisplayVersion() string { - if o.Version != "" { - // Most distributions set VERSION - return o.Version - } else if o.VersionId != "" { - // Some only set VERSION_ID (Alpine Linux for example) - return o.VersionId - } else if o.BuildId != "" { - // Others only set BUILD_ID (Arch Linux for example) - return o.BuildId - } else { - return "(unknown)" - } -} - -// readOsRelease reads and parses the os-release file. -func readOsRelease() (*osRelease, error) { - for _, path := range []string{"/etc/os-release", "/usr/lib/os-release"} { - f, err := os.Open(path) - if err != nil { - if os.IsNotExist(err) { - continue // Try next path. - } else { - return nil, err - } - } - - o := &osRelease{ - Name: "Linux", // Suggested default as per os-release(5) man page. - } - - scanner := bufio.NewScanner(f) - for scanner.Scan() { - line := scanner.Text() - if strings.HasPrefix(line, "#") { - continue // Ignore comment. - } - - parts := strings.SplitN(line, "=", 2) - if len(parts) != 2 { - continue // Ignore empty or possibly malformed line. - } - - key := parts[0] - val := parts[1] - - // Unquote strings. This isn't fully compliant with the specification which allows using some shell escape - // sequences. However, typically quotes are only used to allow whitespace within the value. - if len(val) >= 2 && (val[0] == '"' || val[0] == '\'') && val[0] == val[len(val)-1] { - val = val[1 : len(val)-1] - } - - switch key { - case "NAME": - o.Name = val - case "VERSION": - o.Version = val - case "VERSION_ID": - o.VersionId = val - case "BUILD_ID": - o.BuildId = val - } - } - if err := scanner.Err(); err != nil { - return nil, err - } - - return o, nil - } - - return nil, errors.New("os-release file not found") -}