Skip to content

Commit

Permalink
dry run manager
Browse files Browse the repository at this point in the history
  • Loading branch information
s-urbaniak committed Feb 3, 2025
1 parent 5a72928 commit 55d7aff
Show file tree
Hide file tree
Showing 11 changed files with 606 additions and 71 deletions.
9 changes: 7 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func main() {
setupLog := logger.Named("setup").Sugar()
setupLog.Info("starting with configuration", zap.Any("config", config), zap.Any("version", version.Version))

mgr, err := operator.NewBuilder(operator.ManagerProviderFunc(ctrl.NewManager), akoScheme, time.Duration(minimumIndependentSyncPeriod)*time.Minute).
runnable, err := operator.NewBuilder(operator.ManagerProviderFunc(ctrl.NewManager), akoScheme, time.Duration(minimumIndependentSyncPeriod)*time.Minute).
WithConfig(ctrl.GetConfigOrDie()).
WithNamespaces(collection.Keys(config.WatchedNamespaces)...).
WithLogger(logger).
Expand All @@ -86,6 +86,7 @@ func main() {
WithAPISecret(config.GlobalAPISecret).
WithDeletionProtection(config.ObjectDeletionProtection).
WithIndependentSyncPeriod(time.Duration(config.IndependentSyncPeriod) * time.Minute).
WithDryRun(config.DryRun).
Build(ctx)
if err != nil {
setupLog.Error(err, "unable to start operator")
Expand All @@ -94,7 +95,7 @@ func main() {

setupLog.Info(subobjectDeletionProtectionMessage)
setupLog.Info("starting manager")
if err = mgr.Start(ctx); err != nil {
if err = runnable.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand All @@ -113,6 +114,7 @@ type Config struct {
SubObjectDeletionProtection bool
IndependentSyncPeriod int
FeatureFlags *featureflags.FeatureFlags
DryRun bool
}

// ParseConfiguration fills the 'OperatorConfig' from the flags passed to the program
Expand All @@ -139,6 +141,9 @@ func parseConfiguration() Config {
independentSyncPeriod,
fmt.Sprintf("The default time, in minutes, between reconciliations for independent custom resources. (default %d, minimum %d)", independentSyncPeriod, minimumIndependentSyncPeriod),
)
// TODO: uncomment when the dry-run implementation is finished
flag.BoolVar(&config.DryRun, "dry-run", false, "If set, the operator will not perform any changes to the Atlas resources, run all reconcilers only Once and emit events for all planned changes")

appVersion := flag.Bool("v", false, "prints application version")
flag.Parse()

Expand Down
7 changes: 5 additions & 2 deletions internal/controller/atlasproject/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ func (r *AtlasProjectReconciler) handleProject(ctx *workflow.Context, orgID stri
return r.delete(ctx, services, orgID, atlasProject)
case !existInAtlas && wasDeleted:
return r.release(ctx, atlasProject)
case existInAtlas && !wasDeleted && atlasProject.Status.ID == "":
return r.manage(ctx, atlasProject, projectInAtlas.ID)
}

// short circuit the "manage" state,
// there is no need to wait another reconcile cycle to continue.
_, _ = r.manage(ctx, atlasProject, projectInAtlas.ID)
atlasProject.Status.ID = projectInAtlas.ID

if err = r.ensureX509(ctx, atlasProject); err != nil {
return r.terminate(ctx, workflow.Internal, err)
}
Expand Down
24 changes: 19 additions & 5 deletions internal/controller/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlas"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasbackupcompliancepolicy"
Expand All @@ -22,15 +21,16 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasproject"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlassearchindexconfig"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/atlasstream"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/dryrun"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags"
)

type ManagerAware interface {
SetupWithManager(mgr ctrl.Manager, skipNameValidation bool) error
}

type AkoReconciler interface {
reconcile.Reconciler
type Reconciler interface {
dryrun.Reconciler
ManagerAware
}

Expand All @@ -41,7 +41,7 @@ type Registry struct {
featureFlags *featureflags.FeatureFlags

logger *zap.Logger
reconcilers []AkoReconciler
reconcilers []Reconciler
}

func NewRegistry(predicates []predicate.Predicate, deletionProtection bool, logger *zap.Logger, independentSyncPeriod time.Duration, featureFlags *featureflags.FeatureFlags) *Registry {
Expand All @@ -54,6 +54,16 @@ func NewRegistry(predicates []predicate.Predicate, deletionProtection bool, logg
}
}

func (r *Registry) RegisterWithDryRunManager(mgr *dryrun.Manager, ap atlas.Provider) error {
r.registerControllers(mgr, ap)

for _, reconciler := range r.reconcilers {
mgr.SetupReconciler(reconciler)
}

return nil
}

func (r *Registry) RegisterWithManager(mgr ctrl.Manager, skipNameValidation bool, ap atlas.Provider) error {
r.registerControllers(mgr, ap)

Expand All @@ -66,7 +76,11 @@ func (r *Registry) RegisterWithManager(mgr ctrl.Manager, skipNameValidation bool
}

func (r *Registry) registerControllers(c cluster.Cluster, ap atlas.Provider) {
var reconcilers []AkoReconciler
if len(r.reconcilers) > 0 {
return
}

var reconcilers []Reconciler
reconcilers = append(reconcilers, atlasproject.NewAtlasProjectReconciler(c, r.predicates, ap, r.deletionProtection, r.logger))
reconcilers = append(reconcilers, atlasdeployment.NewAtlasDeploymentReconciler(c, r.predicates, ap, r.deletionProtection, r.independentSyncPeriod, r.logger))
reconcilers = append(reconcilers, atlasdatabaseuser.NewAtlasDatabaseUserReconciler(c, r.predicates, ap, r.deletionProtection, r.independentSyncPeriod, r.featureFlags, r.logger))
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/workflow/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"

"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/dryrun"
)

const (
Expand Down Expand Up @@ -41,6 +43,8 @@ func Requeue(period time.Duration) Result {
// 'reason' and 'message' indicate the error state and are supposed to be reflected in the `conditions` for the
// reconciled Custom Resource.
func Terminate(reason ConditionReason, err error) Result {
dryrun.AddTerminationError(err) // TODO: factor this in favor of controller-runtime error handling

return Result{
terminated: true,
requeueAfter: DefaultRetry,
Expand Down Expand Up @@ -78,6 +82,8 @@ func (r Result) IsDeleted() bool {
// TerminateSilently indicates that the reconciliation logic cannot proceed and needs to be finished (and possibly requeued)
// The status of the reconciled Custom Resource is not supposed to be updated.
func TerminateSilently(err error) Result {
dryrun.AddTerminationError(err)

return Result{terminated: true, requeueAfter: DefaultRetry}
}

Expand Down
44 changes: 16 additions & 28 deletions internal/dryrun/error.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,33 @@
package dryrun

import (
"errors"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)

const dryRunErrorPrefix = "DryRun event: "

type DryRunError struct {
GVK string
Namespace, Name string
EventType, Reason, Msg string
Msg string
}

func NewDryRunError(kind schema.ObjectKind, meta metav1.ObjectMetaAccessor, eventtype, reason, messageFmt string, args ...interface{}) error {
gvk := "unknown"
if kind != nil {
gvk = kind.GroupVersionKind().String()
}

namespace, name := "unknown", "unknown"
if meta != nil {
namespace = meta.GetObjectMeta().GetNamespace()
name = meta.GetObjectMeta().GetName()
}

func NewDryRunError(messageFmt string, args ...interface{}) error {
msg := fmt.Sprintf(messageFmt, args...)

return &DryRunError{
GVK: gvk,
Namespace: namespace,
Name: name,
EventType: eventtype,
Reason: reason,
Msg: msg,
Msg: msg,
}
}

func (e *DryRunError) Error() string {
return fmt.Sprintf(
"DryRun event GVK=%v, Namespace=%v, Name=%v, EventType=%v, Reason=%v, Message=%v",
e.GVK, e.Namespace, e.Name, e.EventType, e.Reason, e.Msg,
)
return dryRunErrorPrefix + e.Msg
}

// isDryRunError returns true if the given error is a DryRunError.
//
// Note: we DO NOT want to export this as we do not want "special dry-run" cases in reconcilers.
// Reconcilers should behave exactly the same during dry-run as during regular reconciles.
func isDryRunError(err error) bool {
dErr := &DryRunError{}
return errors.As(err, &dErr)
}
50 changes: 50 additions & 0 deletions internal/dryrun/error_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package dryrun

import (
"errors"
"sync"
)

type errorQueue struct {
mu sync.Mutex // protects fields below

active bool
errs []error
}

var reconcileErrors = &errorQueue{}

func AddTerminationError(err error) {
reconcileErrors.mu.Lock()
defer reconcileErrors.mu.Unlock()

if !reconcileErrors.active {
return
}

reconcileErrors.errs = append(reconcileErrors.errs, err)
}

func terminationError() error {
reconcileErrors.mu.Lock()
defer reconcileErrors.mu.Unlock()

result := make([]error, len(reconcileErrors.errs))
result = append(result, reconcileErrors.errs...)

return errors.Join(result...)
}

func clearTerminationErrors() {
reconcileErrors.mu.Lock()
defer reconcileErrors.mu.Unlock()

reconcileErrors.errs = nil
}

func enableErrors() {
reconcileErrors.mu.Lock()
defer reconcileErrors.mu.Unlock()

reconcileErrors.active = true
}
Loading

0 comments on commit 55d7aff

Please sign in to comment.