Skip to content

Commit

Permalink
State
Browse files Browse the repository at this point in the history
  • Loading branch information
videlov committed Jan 23, 2025
1 parent 92bfb21 commit e357e4d
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 105 deletions.
5 changes: 4 additions & 1 deletion controllers/istio_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kyma-project/istio/operator/internal/validation"

"github.com/kyma-project/istio/operator/pkg/lib/sidecars"
"github.com/kyma-project/istio/operator/pkg/lib/sidecars/pods"

"github.com/kyma-project/istio/operator/internal/described_errors"
"github.com/kyma-project/istio/operator/internal/reconciliations/istio/configuration"
Expand Down Expand Up @@ -57,9 +58,11 @@ func NewController(mgr manager.Manager, reconciliationInterval time.Duration) *I
merger := istiooperator.NewDefaultIstioMerger()

statusHandler := status.NewStatusHandler(mgr.GetClient())
logger := mgr.GetLogger()
podsLister := pods.NewPods(mgr.GetClient(), &logger)
restarters := []restarter.Restarter{
restarter.NewIngressGatewayRestarter(mgr.GetClient(), []predicates.IngressGatewayPredicate{}, statusHandler),
restarter.NewSidecarsRestarter(mgr.GetLogger(), mgr.GetClient(), &merger, sidecars.NewProxyRestarter(), statusHandler),
restarter.NewSidecarsRestarter(mgr.GetLogger(), mgr.GetClient(), &merger, sidecars.NewProxyRestarter(mgr.GetClient(), podsLister, &logger), statusHandler),
}

return &IstioReconciler{
Expand Down
2 changes: 1 addition & 1 deletion internal/restarter/sidecars.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *SidecarRestarter) Restart(ctx context.Context, istioCR *v1alpha2.Istio)
return described_errors.NewDescribedError(err, errorDescription), false
}

warnings, hasMorePods, err := s.ProxyRestarter.RestartProxies(ctx, s.Client, expectedImage, expectedResources, istioCR, &s.Log)
warnings, hasMorePods, err := s.ProxyRestarter.RestartProxies(ctx, expectedImage, expectedResources, istioCR)
if err != nil {
s.Log.Error(err, "Failed to reset proxy")
s.StatusHandler.SetCondition(istioCR, v1alpha2.NewReasonWithMessage(v1alpha2.ConditionReasonProxySidecarRestartFailed))
Expand Down
9 changes: 6 additions & 3 deletions internal/restarter/sidecars_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
)

var _ = Describe("SidecarsRestarter reconciliation", func() {
logger := logr.Discard()
It("should fail proxy reset if Istio pods do not match target version", func() {
// given
numTrustedProxies := 1
Expand All @@ -48,8 +49,10 @@ var _ = Describe("SidecarsRestarter reconciliation", func() {
istiod := createPod("istiod", gatherer.IstioNamespace, "discovery", "1.16.0")
fakeClient := createFakeClient(&istioCr, istiod)
statusHandler := status.NewStatusHandler(fakeClient)
podsLister := pods.NewPods(fakeClient, &logger)
proxyRestarter := sidecars.NewProxyRestarter(fakeClient, podsLister, &logger)
sidecarsRestarter := restarter.NewSidecarsRestarter(logr.Discard(), createFakeClient(&istioCr, istiod),
&MergerMock{"1.16.1-distroless"}, sidecars.NewProxyRestarter(), statusHandler)
&MergerMock{"1.16.1-distroless"}, proxyRestarter, statusHandler)

// when
err, requeue := sidecarsRestarter.Restart(context.Background(), &istioCr)
Expand Down Expand Up @@ -307,10 +310,10 @@ type proxyRestarterMock struct {
err error
}

func (p *proxyRestarterMock) RestartProxies(_ context.Context, _ client.Client, _ predicates.SidecarImage, _ v1.ResourceRequirements, _ *v1alpha2.Istio, _ *logr.Logger) ([]restart.RestartWarning, bool, error) {
func (p *proxyRestarterMock) RestartProxies(_ context.Context, _ predicates.SidecarImage, _ v1.ResourceRequirements, _ *v1alpha2.Istio) ([]restart.RestartWarning, bool, error) {
return p.restartWarnings, p.hasMorePods, p.err
}

func (p *proxyRestarterMock) RestartWithPredicates(ctx context.Context, c client.Client, preds []predicates.SidecarProxyPredicate, limits *pods.PodsRestartLimits, logger *logr.Logger) ([]restart.RestartWarning, bool, error) {
func (p *proxyRestarterMock) RestartWithPredicates(_ context.Context, preds []predicates.SidecarProxyPredicate, limits *pods.PodsRestartLimits) ([]restart.RestartWarning, bool, error) {
return p.restartWarnings, p.hasMorePods, p.err
}
121 changes: 69 additions & 52 deletions pkg/lib/sidecars/pods/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,27 @@ func NewPodsRestartLimits(restartLimit, listLimit int) *PodsRestartLimits {
}
}

func listRunningPods(ctx context.Context, c client.Client, listLimit int, continueToken string) (*v1.PodList, error) {
podList := &v1.PodList{}

err := retry.RetryOnError(retry.DefaultRetry, func() error {
listOps := []client.ListOption{
client.MatchingFieldsSelector{Selector: fields.OneTermEqualSelector("status.phase", string(v1.PodRunning))},
client.Limit(listLimit),
}
if continueToken != "" {
listOps = append(listOps, client.Continue(continueToken))
}
return c.List(ctx, podList, listOps...)
})

return podList, err
type PodsGetter interface {
GetPodsToRestart(ctx context.Context, preds []predicates.SidecarProxyPredicate, limits *PodsRestartLimits) (*v1.PodList, error)
GetAllInjectedPods(context context.Context) (outputPodList *v1.PodList, err error)
}

func getSidecarPods(ctx context.Context, c client.Client, logger *logr.Logger, listLimit int, continueToken string) (*v1.PodList, error) {
podList, err := listRunningPods(ctx, c, listLimit, continueToken)
if err != nil {
return nil, err
}

logger.Info("Got running pods for proxy restart", "number of pods", len(podList.Items), "has more pods", podList.Continue != "")

podsWithSidecar := &v1.PodList{}
podsWithSidecar.Continue = podList.Continue
type Pods struct {
k8sClient client.Client
logger *logr.Logger
}

for _, pod := range podList.Items {
if predicates.IsReadyWithIstioAnnotation(pod) {
podsWithSidecar.Items = append(podsWithSidecar.Items, pod)
}
func NewPods(k8sClient client.Client, logger *logr.Logger) *Pods {
return &Pods{
k8sClient: k8sClient,
logger: logger,
}

logger.Info("Filtered pods with Istio sidecar", "number of pods", len(podsWithSidecar.Items))
return podsWithSidecar, nil
}

func GetPodsToRestart(ctx context.Context, c client.Client, preds []predicates.SidecarProxyPredicate, limits *PodsRestartLimits, logger *logr.Logger) (*v1.PodList, error) {
func (p *Pods) GetPodsToRestart(ctx context.Context, preds []predicates.SidecarProxyPredicate, limits *PodsRestartLimits) (*v1.PodList, error) {
podsToRestart := &v1.PodList{}
for while := true; while; {
podsWithSidecar, err := getSidecarPods(ctx, c, logger, limits.podsToListLimit, podsToRestart.Continue)
podsWithSidecar, err := getSidecarPods(ctx, p.k8sClient, p.logger, limits.podsToListLimit, podsToRestart.Continue)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -97,35 +76,21 @@ func GetPodsToRestart(ctx context.Context, c client.Client, preds []predicates.S
}

if len(podsToRestart.Items) > 0 {
logger.Info("Pods to restart", "number of pods", len(podsToRestart.Items), "has more pods", podsToRestart.Continue != "")
p.logger.Info("Pods to restart", "number of pods", len(podsToRestart.Items), "has more pods", podsToRestart.Continue != "")
} else {
logger.Info("No pods to restart with matching predicates")
p.logger.Info("No pods to restart with matching predicates")
}

return podsToRestart, nil
}

func containsSidecar(pod v1.Pod) bool {
// If the pod has one container it is not injected
// This skips IngressGateway and EgressGateway pods, as those only have istio-proxy
if len(pod.Spec.Containers) == 1 {
return false
}
for _, container := range pod.Spec.Containers {
if container.Name == istioSidecarContainerName {
return true
}
}
return false
}

func GetAllInjectedPods(ctx context.Context, k8sclient client.Client) (outputPodList *v1.PodList, err error) {
func (p *Pods) GetAllInjectedPods(ctx context.Context) (outputPodList *v1.PodList, err error) {
podList := &v1.PodList{}
outputPodList = &v1.PodList{}
outputPodList.Items = make([]v1.Pod, len(podList.Items))

err = retry.RetryOnError(retry.DefaultRetry, func() error {
return k8sclient.List(ctx, podList, &client.ListOptions{})
return p.k8sClient.List(ctx, podList, &client.ListOptions{})
})
if err != nil {
return podList, err
Expand All @@ -139,3 +104,55 @@ func GetAllInjectedPods(ctx context.Context, k8sclient client.Client) (outputPod

return outputPodList, nil
}

func listRunningPods(ctx context.Context, c client.Client, listLimit int, continueToken string) (*v1.PodList, error) {
podList := &v1.PodList{}

err := retry.RetryOnError(retry.DefaultRetry, func() error {
listOps := []client.ListOption{
client.MatchingFieldsSelector{Selector: fields.OneTermEqualSelector("status.phase", string(v1.PodRunning))},
client.Limit(listLimit),
}
if continueToken != "" {
listOps = append(listOps, client.Continue(continueToken))
}
return c.List(ctx, podList, listOps...)
})

return podList, err
}

func getSidecarPods(ctx context.Context, c client.Client, logger *logr.Logger, listLimit int, continueToken string) (*v1.PodList, error) {
podList, err := listRunningPods(ctx, c, listLimit, continueToken)
if err != nil {
return nil, err
}

logger.Info("Got running pods for proxy restart", "number of pods", len(podList.Items), "has more pods", podList.Continue != "")

podsWithSidecar := &v1.PodList{}
podsWithSidecar.Continue = podList.Continue

for _, pod := range podList.Items {
if predicates.IsReadyWithIstioAnnotation(pod) {
podsWithSidecar.Items = append(podsWithSidecar.Items, pod)
}
}

logger.Info("Filtered pods with Istio sidecar", "number of pods", len(podsWithSidecar.Items))
return podsWithSidecar, nil
}

func containsSidecar(pod v1.Pod) bool {
// If the pod has one container it is not injected
// This skips IngressGateway and EgressGateway pods, as those only have istio-proxy
if len(pod.Spec.Containers) == 1 {
return false
}
for _, container := range pod.Spec.Containers {
if container.Name == istioSidecarContainerName {
return true
}
}
return false
}
10 changes: 7 additions & 3 deletions pkg/lib/sidecars/pods/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ var _ = Describe("GetPodsToRestart", func() {
for _, tt := range tests {
It(tt.name, func() {
tt.predicates = append(tt.predicates, predicates.NewImageResourcesPredicate(expectedImage, helpers.DefaultSidecarResources))
podList, err := pods.GetPodsToRestart(ctx, tt.c, tt.predicates, tt.limits, &logger)
podsLister := pods.NewPods(tt.c, &logger)
podList, err := podsLister.GetPodsToRestart(ctx, tt.predicates, tt.limits)
Expect(err).NotTo(HaveOccurred())
tt.assertFunc(podList)
})
Expand Down Expand Up @@ -339,7 +340,8 @@ var _ = Describe("GetPodsToRestart", func() {
for _, tt := range tests {
It(tt.name, func() {
expectedImage := predicates.NewSidecarImage("istio", "1.10.0")
podList, err := pods.GetPodsToRestart(ctx, tt.c, []predicates.SidecarProxyPredicate{predicates.NewImageResourcesPredicate(expectedImage, helpers.DefaultSidecarResources)}, pods.NewPodsRestartLimits(5, 5), &logger)
podsLister := pods.NewPods(tt.c, &logger)
podList, err := podsLister.GetPodsToRestart(ctx, []predicates.SidecarProxyPredicate{predicates.NewImageResourcesPredicate(expectedImage, helpers.DefaultSidecarResources)}, pods.NewPodsRestartLimits(5, 5))
Expect(err).NotTo(HaveOccurred())
tt.assertFunc(podList)
})
Expand All @@ -349,6 +351,7 @@ var _ = Describe("GetPodsToRestart", func() {

var _ = Describe("GetAllInjectedPods", func() {
ctx := context.Background()
logger := logr.Discard()

tests := []struct {
name string
Expand Down Expand Up @@ -377,7 +380,8 @@ var _ = Describe("GetAllInjectedPods", func() {
}
for _, tt := range tests {
It(tt.name, func() {
podList, err := pods.GetAllInjectedPods(ctx, tt.c)
podsLister := pods.NewPods(tt.c, &logger)
podList, err := podsLister.GetAllInjectedPods(ctx)
Expect(err).NotTo(HaveOccurred())
tt.assertFunc(podList)
})
Expand Down
55 changes: 31 additions & 24 deletions pkg/lib/sidecars/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,87 +20,94 @@ const (
)

type ProxyRestarter interface {
RestartProxies(ctx context.Context, c client.Client, expectedImage predicates.SidecarImage, expectedResources v1.ResourceRequirements, istioCR *v1alpha2.Istio, logger *logr.Logger) ([]restart.RestartWarning, bool, error)
RestartWithPredicates(ctx context.Context, c client.Client, preds []predicates.SidecarProxyPredicate, limits *pods.PodsRestartLimits, logger *logr.Logger) ([]restart.RestartWarning, bool, error)
RestartProxies(ctx context.Context, expectedImage predicates.SidecarImage, expectedResources v1.ResourceRequirements, istioCR *v1alpha2.Istio) ([]restart.RestartWarning, bool, error)
RestartWithPredicates(ctx context.Context, preds []predicates.SidecarProxyPredicate, limits *pods.PodsRestartLimits) ([]restart.RestartWarning, bool, error)
}

type ProxyRestart struct {
k8sClient client.Client
podsLister *pods.Pods
logger *logr.Logger
}

func NewProxyRestarter() *ProxyRestart {
return &ProxyRestart{}
func NewProxyRestarter(c client.Client, podsLister *pods.Pods, logger *logr.Logger) *ProxyRestart {
return &ProxyRestart{
k8sClient: c,
podsLister: podsLister,
logger: logger,
}
}

func (p *ProxyRestart) RestartProxies(ctx context.Context, c client.Client, expectedImage predicates.SidecarImage, expectedResources v1.ResourceRequirements, istioCR *v1alpha2.Istio, logger *logr.Logger) ([]restart.RestartWarning, bool, error) {
func (p *ProxyRestart) RestartProxies(ctx context.Context, expectedImage predicates.SidecarImage, expectedResources v1.ResourceRequirements, istioCR *v1alpha2.Istio) ([]restart.RestartWarning, bool, error) {
compatibiltyPredicate, err := predicates.NewCompatibilityRestartPredicate(istioCR)
if err != nil {
logger.Error(err, "Failed to create restart compatibility predicate")
p.logger.Error(err, "Failed to create restart compatibility predicate")
return nil, false, err
}

predicates := []predicates.SidecarProxyPredicate{compatibiltyPredicate,
predicates.NewImageResourcesPredicate(expectedImage, expectedResources),
}

err = p.restartKymaProxies(ctx, c, predicates, logger)
err = p.restartKymaProxies(ctx, predicates)
if err != nil {
logger.Error(err, "Failed to restart Kyma proxies")
p.logger.Error(err, "Failed to restart Kyma proxies")
return nil, false, err
}

warnings, hasMorePodsToRestart, err := p.restartCustomerProxies(ctx, c, predicates, logger)
warnings, hasMorePodsToRestart, err := p.restartCustomerProxies(ctx, predicates)
if err != nil {
logger.Error(err, "Failed to restart Customer proxies")
p.logger.Error(err, "Failed to restart Customer proxies")
}

return warnings, hasMorePodsToRestart, nil
}

func (p *ProxyRestart) RestartWithPredicates(ctx context.Context, c client.Client, preds []predicates.SidecarProxyPredicate, limits *pods.PodsRestartLimits, logger *logr.Logger) ([]restart.RestartWarning, bool, error) {
podsToRestart, err := pods.GetPodsToRestart(ctx, c, preds, limits, logger)
func (p *ProxyRestart) RestartWithPredicates(ctx context.Context, preds []predicates.SidecarProxyPredicate, limits *pods.PodsRestartLimits) ([]restart.RestartWarning, bool, error) {
podsToRestart, err := p.podsLister.GetPodsToRestart(ctx, preds, limits)
if err != nil {
logger.Error(err, "Getting Kyma pods to restart failed")
p.logger.Error(err, "Getting Kyma pods to restart failed")
return nil, false, err
}

warnings, err := restart.Restart(ctx, c, podsToRestart, logger, true)
warnings, err := restart.Restart(ctx, p.k8sClient, podsToRestart, p.logger, true)
if err != nil {
logger.Error(err, "Restarting Kyma pods failed")
p.logger.Error(err, "Restarting Kyma pods failed")
return nil, false, err
}

// if there are more pods to restart there should be a continue token in the pod list
return warnings, podsToRestart.Continue != "", nil
}

func (p *ProxyRestart) restartKymaProxies(ctx context.Context, c client.Client, preds []predicates.SidecarProxyPredicate, logger *logr.Logger) error {
func (p *ProxyRestart) restartKymaProxies(ctx context.Context, preds []predicates.SidecarProxyPredicate) error {
preds = append(preds, predicates.KymaWorkloadRestartPredicate{})
limits := pods.NewPodsRestartLimits(math.MaxInt, math.MaxInt)

_, _, err := p.RestartWithPredicates(ctx, c, preds, limits, logger)
_, _, err := p.RestartWithPredicates(ctx, preds, limits)
if err != nil {
logger.Error(err, "Failed to restart Kyma proxies")
p.logger.Error(err, "Failed to restart Kyma proxies")
return err
}

logger.Info("Kyma proxy restart completed")
p.logger.Info("Kyma proxy restart completed")
return nil
}

func (p *ProxyRestart) restartCustomerProxies(ctx context.Context, c client.Client, preds []predicates.SidecarProxyPredicate, logger *logr.Logger) ([]restart.RestartWarning, bool, error) {
func (p *ProxyRestart) restartCustomerProxies(ctx context.Context, preds []predicates.SidecarProxyPredicate) ([]restart.RestartWarning, bool, error) {
preds = append(preds, predicates.CustomerWorkloadRestartPredicate{})
limits := pods.NewPodsRestartLimits(podsToRestartLimit, podsToListLimit)

warnings, hasMorePodsToRestart, err := p.RestartWithPredicates(ctx, c, preds, limits, logger)
warnings, hasMorePodsToRestart, err := p.RestartWithPredicates(ctx, preds, limits)
if err != nil {
logger.Error(err, "Failed to restart Customer proxies")
p.logger.Error(err, "Failed to restart Customer proxies")
return nil, false, err
}

if !hasMorePodsToRestart {
logger.Info("Customer proxy restart completed")
p.logger.Info("Customer proxy restart completed")
} else {
logger.Info("Customer proxy restart only partially completed")
p.logger.Info("Customer proxy restart only partially completed")
}

return warnings, hasMorePodsToRestart, nil
Expand Down
Loading

0 comments on commit e357e4d

Please sign in to comment.