Skip to content

Commit

Permalink
Add QPS and Burst configuration for Kubernetes client in controller o…
Browse files Browse the repository at this point in the history
…ptionsUpdated `Options` struct with `QPS` and `Burst` fields, added corresponding flags in `AddFlags` method, and set these values in the `Run` function for ippoollet, machinepoollet, and volumepoollet controllers. This allows configurable rate limits for Kubernetes client requests and logs the settings on startup.Suggestion: Review the changes to ensure consistency and test the new configuration options.

Signed-off-by: Benjamin Alpert <[email protected]>
  • Loading branch information
balpert89 committed Nov 26, 2024
1 parent 5c1ded1 commit f0255c7
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 61 deletions.
21 changes: 17 additions & 4 deletions broker/bucketbroker/cmd/bucketbroker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"fmt"
"net"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/ironcore-dev/ironcore/broker/bucketbroker/server"
"github.com/ironcore-dev/ironcore/broker/common"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/grpc"

"github.com/ironcore-dev/ironcore/broker/bucketbroker/server"
"github.com/ironcore-dev/ironcore/broker/common"
"github.com/ironcore-dev/ironcore/broker/common/defaults"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"

"github.com/ironcore-dev/controller-utils/configutils"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand All @@ -24,6 +27,9 @@ type Options struct {
Kubeconfig string
Address string

QPS float32
Burst int

Namespace string
BucketPoolName string
BucketPoolSelector map[string]string
Expand All @@ -33,6 +39,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path pointing to a kubeconfig file to use.")
fs.StringVar(&o.Address, "address", "/var/run/iri-bucketbroker.sock", "Address to listen on.")

fs.Float32Var(&o.QPS, "qps", defaults.QPS, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", defaults.Burst, "Kubernetes client burst.")

fs.StringVar(&o.Namespace, "namespace", o.Namespace, "Target Kubernetes namespace to use.")
fs.StringVar(&o.BucketPoolName, "bucket-pool-name", o.BucketPoolName, "Name of the target bucket pool to pin buckets to, if any.")
fs.StringToStringVar(&o.BucketPoolSelector, "bucket-pool-selector", o.BucketPoolSelector, "Selector of the target bucket pools to pin buckets to, if any.")
Expand Down Expand Up @@ -74,6 +83,10 @@ func Run(ctx context.Context, opts Options) error {
return err
}

cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

srv, err := server.New(cfg, server.Options{
Namespace: opts.Namespace,
BucketPoolName: opts.BucketPoolName,
Expand Down
9 changes: 9 additions & 0 deletions broker/common/defaults/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package defaults

const (
QPS float32 = 20.0
Burst int = 30
)
25 changes: 19 additions & 6 deletions broker/machinebroker/cmd/machinebroker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import (
"net/url"

"github.com/go-logr/logr"
"github.com/ironcore-dev/controller-utils/configutils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/ironcore-dev/ironcore/broker/common"
"github.com/ironcore-dev/ironcore/broker/common/defaults"
commongrpc "github.com/ironcore-dev/ironcore/broker/common/grpc"
machinebrokerhttp "github.com/ironcore-dev/ironcore/broker/machinebroker/http"
"github.com/ironcore-dev/ironcore/broker/machinebroker/server"
iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"

"github.com/ironcore-dev/controller-utils/configutils"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand All @@ -34,6 +37,9 @@ type Options struct {
BaseURL string
BrokerDownwardAPILabels map[string]string

QPS float32
Burst int

Namespace string
MachinePoolName string
MachinePoolSelector map[string]string
Expand All @@ -48,6 +54,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringToStringVar(&o.BrokerDownwardAPILabels, "broker-downward-api-label", nil, "The labels to broker via downward API. "+
"Example is for instance to broker \"root-machine-uid\" initially obtained via \"machinepoollet.ironcore.dev/machine-uid\".")

fs.Float32Var(&o.QPS, "qps", defaults.QPS, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", defaults.Burst, "Kubernetes client burst.")

fs.StringVar(&o.Namespace, "namespace", o.Namespace, "Target Kubernetes namespace to use.")
fs.StringVar(&o.MachinePoolName, "machine-pool-name", o.MachinePoolName, "Name of the target machine pool to pin machines to, if any.")
fs.StringToStringVar(&o.MachinePoolSelector, "machine-pool-selector", o.MachinePoolSelector, "Selector of the target machine pools to pin machines to, if any.")
Expand Down Expand Up @@ -102,6 +111,10 @@ func Run(ctx context.Context, opts Options) error {
baseURL = u.String()
}

cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

log.V(1).Info("Creating server",
"Namespace", opts.Namespace,
"MachinePoolName", opts.MachinePoolName,
Expand Down Expand Up @@ -142,7 +155,7 @@ func runServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Serve
return nil
}

func runGRPCServer(ctx context.Context, setupLog logr.Logger, log logr.Logger, srv *server.Server, opts Options) error {
func runGRPCServer(ctx context.Context, setupLog, log logr.Logger, srv *server.Server, opts Options) error {
log.V(1).Info("Cleaning up any previous socket")
if err := common.CleanupSocketIfExists(opts.Address); err != nil {
return fmt.Errorf("error cleaning up socket: %w", err)
Expand Down
21 changes: 17 additions & 4 deletions broker/volumebroker/cmd/volumebroker/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"fmt"
"net"

"github.com/ironcore-dev/controller-utils/configutils"
"github.com/ironcore-dev/ironcore/broker/common"
"github.com/ironcore-dev/ironcore/broker/volumebroker/server"
iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/grpc"

"github.com/ironcore-dev/ironcore/broker/common"
"github.com/ironcore-dev/ironcore/broker/common/defaults"
"github.com/ironcore-dev/ironcore/broker/volumebroker/server"
iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1"

"github.com/ironcore-dev/controller-utils/configutils"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand All @@ -24,6 +27,9 @@ type Options struct {
Kubeconfig string
Address string

QPS float32
Burst int

Namespace string
VolumePoolName string
VolumePoolSelector map[string]string
Expand All @@ -33,6 +39,9 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "Path pointing to a kubeconfig file to use.")
fs.StringVar(&o.Address, "address", "/var/run/iri-volumebroker.sock", "Address to listen on.")

fs.Float32Var(&o.QPS, "qps", defaults.QPS, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", defaults.Burst, "Kubernetes client burst.")

fs.StringVar(&o.Namespace, "namespace", o.Namespace, "Target Kubernetes namespace to use.")
fs.StringVar(&o.VolumePoolName, "volume-pool-name", o.VolumePoolName, "Name of the target volume pool to pin volumes to, if any.")
fs.StringToStringVar(&o.VolumePoolSelector, "volume-pool-selector", o.VolumePoolSelector, "Selector of the target volume pools to pin volumes to, if any.")
Expand Down Expand Up @@ -74,6 +83,10 @@ func Run(ctx context.Context, opts Options) error {
return err
}

cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

srv, err := server.New(cfg, server.Options{
Namespace: opts.Namespace,
VolumePoolName: opts.VolumePoolName,
Expand Down
29 changes: 21 additions & 8 deletions poollet/bucketpoollet/cmd/bucketpoollet/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ironcore-dev/ironcore/poollet/bucketpoollet/bem"
bucketpoolletconfig "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/client/config"
"github.com/ironcore-dev/ironcore/poollet/bucketpoollet/controllers"
"github.com/ironcore-dev/ironcore/poollet/common/defaults"
"github.com/ironcore-dev/ironcore/poollet/irievent"
"github.com/ironcore-dev/ironcore/utils/client/config"

Expand Down Expand Up @@ -65,6 +66,10 @@ type Options struct {
RelistThreshold time.Duration

WatchFilterValue string

QPS float32
Burst int
MaxConcurrentReconciles int
}

func (o *Options) AddFlags(fs *pflag.FlagSet) {
Expand All @@ -89,6 +94,10 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.RelistThreshold, "relist-threshold", 3*time.Minute, "event channel relisting threshold.")

fs.StringVar(&o.WatchFilterValue, "watch-filter", "", "Value to filter for while watching.")

fs.Float32Var(&o.QPS, "qps", defaults.QPS, "Kubernetes client qps.")
fs.IntVar(&o.Burst, "burst", defaults.Burst, "Kubernetes client burst.")
fs.IntVar(&o.MaxConcurrentReconciles, "max-concurrent-reconciles", 1, "Maximum number of concurrent reconciles.")
}

func (o *Options) MarkFlagsRequired(cmd *cobra.Command) {
Expand Down Expand Up @@ -150,7 +159,10 @@ func Run(ctx context.Context, opts Options) error {
return fmt.Errorf("error getting config: %w", err)
}

setupLog.Info("IRI client config", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold)
setupLog.Info("IRI Client configuration", "ChannelCapacity", opts.ChannelCapacity, "RelistPeriod", opts.RelistPeriod, "RelistThreshold", opts.RelistThreshold)
cfg.QPS = opts.QPS
cfg.Burst = opts.Burst
setupLog.Info("Kubernetes Client configuration", "QPS", cfg.QPS, "Burst", cfg.Burst)

leaderElectionCfg, err := configutils.GetConfig(
configutils.Kubeconfig(opts.LeaderElectionKubeconfig),
Expand Down Expand Up @@ -213,13 +225,14 @@ func Run(ctx context.Context, opts Options) error {
}

if err := (&controllers.BucketReconciler{
EventRecorder: mgr.GetEventRecorderFor("buckets"),
Client: mgr.GetClient(),
Scheme: scheme,
BucketRuntime: bucketRuntime,
BucketClassMapper: bucketClassMapper,
BucketPoolName: opts.BucketPoolName,
WatchFilterValue: opts.WatchFilterValue,
EventRecorder: mgr.GetEventRecorderFor("buckets"),
Client: mgr.GetClient(),
Scheme: scheme,
BucketRuntime: bucketRuntime,
BucketClassMapper: bucketClassMapper,
BucketPoolName: opts.BucketPoolName,
WatchFilterValue: opts.WatchFilterValue,
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("error setting up bucket reconciler with manager: %w", err)
}
Expand Down
17 changes: 12 additions & 5 deletions poollet/bucketpoollet/controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@ import (
"fmt"

"github.com/go-logr/logr"
"github.com/ironcore-dev/controller-utils/clientutils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
iriBucket "github.com/ironcore-dev/ironcore/iri/apis/bucket"
iri "github.com/ironcore-dev/ironcore/iri/apis/bucket/v1alpha1"

irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
bucketpoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/bucketpoollet/api/v1alpha1"
"github.com/ironcore-dev/ironcore/poollet/bucketpoollet/bcm"
"github.com/ironcore-dev/ironcore/poollet/bucketpoollet/controllers/events"
ironcoreclient "github.com/ironcore-dev/ironcore/utils/client"
"github.com/ironcore-dev/ironcore/utils/predicates"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/ironcore-dev/controller-utils/clientutils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -32,6 +33,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
Expand All @@ -47,6 +49,8 @@ type BucketReconciler struct {

BucketPoolName string
WatchFilterValue string

MaxConcurrentReconciles int
}

func (r *BucketReconciler) iriBucketLabels(bucket *storagev1alpha1.Bucket) map[string]string {
Expand Down Expand Up @@ -432,7 +436,6 @@ func (r *BucketReconciler) updateStatus(ctx context.Context, log logr.Logger, bu
Endpoint: iriAccess.Endpoint,
}
}

}

base := bucket.DeepCopy()
Expand Down Expand Up @@ -466,6 +469,10 @@ func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
predicates.ResourceIsNotExternallyManaged(log),
),
).
WithOptions(
controller.Options{
MaxConcurrentReconciles: r.MaxConcurrentReconciles,
}).
Complete(r)
}

Expand Down
12 changes: 7 additions & 5 deletions poollet/bucketpoollet/controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"testing"
"time"

"github.com/ironcore-dev/controller-utils/buildutils"
"github.com/ironcore-dev/controller-utils/modutils"
corev1alpha1 "github.com/ironcore-dev/ironcore/api/core/v1alpha1"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
storageclient "github.com/ironcore-dev/ironcore/internal/client/storage"
Expand All @@ -22,8 +20,9 @@ import (
"github.com/ironcore-dev/ironcore/utils/envtest/apiserver"
"github.com/ironcore-dev/ironcore/utils/envtest/controllermanager"
"github.com/ironcore-dev/ironcore/utils/envtest/process"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/ironcore-dev/controller-utils/buildutils"
"github.com/ironcore-dev/controller-utils/modutils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -34,10 +33,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
. "sigs.k8s.io/controller-runtime/pkg/envtest/komega"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "sigs.k8s.io/controller-runtime/pkg/envtest/komega"
)

var (
Expand Down
9 changes: 9 additions & 0 deletions poollet/common/defaults/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package defaults

const (
QPS float32 = 20.0
Burst int = 30
)
Loading

0 comments on commit f0255c7

Please sign in to comment.