Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Label CRs with new global account id #1158

Merged
merged 38 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/broker/broker_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,10 @@ func (s *BrokerSuiteTest) CreateAPI(inputFactory broker.PlanValidator, cfg *Conf
planDefaults := func(planID string, platformProvider internal.CloudProvider, provider *internal.CloudProvider) (*gqlschema.ClusterConfigInput, error) {
return &gqlschema.ClusterConfigInput{}, nil
}
var fakeKcpK8sClient = fake.NewClientBuilder().Build()
kcBuilder := &kcMock.KcBuilder{}
kcBuilder.On("Build", nil).Return("--kubeconfig file", nil)
createAPI(s.router, servicesConfig, inputFactory, cfg, db, provisioningQueue, deprovisionQueue, updateQueue, lager.NewLogger("api"), logs, planDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, gardenerClient)
createAPI(s.router, servicesConfig, inputFactory, cfg, db, provisioningQueue, deprovisionQueue, updateQueue, lager.NewLogger("api"), logs, planDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, gardenerClient, fakeKcpK8sClient)

s.httpServer = httptest.NewServer(s.router)
}
Expand Down
30 changes: 15 additions & 15 deletions cmd/broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ func main() {
}

cfg.OrchestrationConfig.KubernetesVersion = cfg.Provisioner.KubernetesVersion

// create logger
logger := lager.NewLogger("kyma-env-broker")

Expand All @@ -252,11 +251,11 @@ func main() {
provisionerClient := provisioner.NewProvisionerClient(cfg.Provisioner.URL, cfg.DumpProvisionerRequests, logs.WithField("service", "provisioner"))

// create kubernetes client
k8sCfg, err := config.GetConfig()
kcpK8sConfig, err := config.GetConfig()
fatalOnError(err, logs)
cli, err := initClient(k8sCfg)
kcpK8sClient, err := initClient(kcpK8sConfig)
fatalOnError(err, logs)
skrK8sClientProvider := kubeconfig.NewK8sClientFromSecretProvider(cli)
skrK8sClientProvider := kubeconfig.NewK8sClientFromSecretProvider(kcpK8sClient)

// create storage
cipher := storage.NewEncrypter(cfg.Database.SecretKey)
Expand All @@ -280,7 +279,7 @@ func main() {

// provides configuration for specified Kyma version and plan
configProvider := kebConfig.NewConfigProvider(
kebConfig.NewConfigMapReader(ctx, cli, logs, cfg.RuntimeConfigurationConfigMapName),
kebConfig.NewConfigMapReader(ctx, kcpK8sClient, logs, cfg.RuntimeConfigurationConfigMapName),
kebConfig.NewConfigMapKeysValidator(),
kebConfig.NewConfigMapConverter())
gardenerClusterConfig, err := gardener.NewGardenerClusterConfig(cfg.Gardener.KubeconfigPath)
Expand Down Expand Up @@ -317,25 +316,25 @@ func main() {
// run queues
provisionManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Provisioning, logs.WithField("provisioning", "manager"))
provisionQueue := NewProvisioningProcessingQueue(ctx, provisionManager, cfg.Provisioning.WorkersAmount, &cfg, db, provisionerClient, inputFactory,
edpClient, accountProvider, skrK8sClientProvider, cli, oidcDefaultValues, logs)
edpClient, accountProvider, skrK8sClientProvider, kcpK8sClient, oidcDefaultValues, logs)

deprovisionManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Deprovisioning, logs.WithField("deprovisioning", "manager"))
deprovisionQueue := NewDeprovisioningProcessingQueue(ctx, cfg.Deprovisioning.WorkersAmount, deprovisionManager, &cfg, db, eventBroker, provisionerClient, edpClient, accountProvider,
skrK8sClientProvider, cli, configProvider, logs)
skrK8sClientProvider, kcpK8sClient, configProvider, logs)

updateManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Update, logs.WithField("update", "manager"))
updateQueue := NewUpdateProcessingQueue(ctx, updateManager, cfg.Update.WorkersAmount, db, inputFactory, provisionerClient, eventBroker,
cfg, skrK8sClientProvider, cli, logs)
cfg, skrK8sClientProvider, kcpK8sClient, logs)
/***/
servicesConfig, err := broker.NewServicesConfigFromFile(cfg.CatalogFilePath)
fatalOnError(err, logs)

// create kubeconfig builder
kcBuilder := kubeconfig.NewBuilder(provisionerClient, cli, skrK8sClientProvider)
kcBuilder := kubeconfig.NewBuilder(provisionerClient, kcpK8sClient, skrK8sClientProvider)

// create server
router := mux.NewRouter()
createAPI(router, servicesConfig, inputFactory, &cfg, db, provisionQueue, deprovisionQueue, updateQueue, logger, logs, inputFactory.GetPlanDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, gardenerClient)
createAPI(router, servicesConfig, inputFactory, &cfg, db, provisionQueue, deprovisionQueue, updateQueue, logger, logs, inputFactory.GetPlanDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, kcpK8sClient, gardenerClient)

// create metrics endpoint
router.Handle("/metrics", promhttp.Handler())
Expand All @@ -348,7 +347,7 @@ func main() {
runtimeResolver := orchestrationExt.NewGardenerRuntimeResolver(dynamicGardener, gardenerNamespace, runtimeLister, logs)

clusterQueue := NewClusterOrchestrationProcessingQueue(ctx, db, provisionerClient, eventBroker, inputFactory,
nil, time.Minute, runtimeResolver, notificationBuilder, logs, cli, cfg, 1)
nil, time.Minute, runtimeResolver, notificationBuilder, logs, kcpK8sClient, cfg, 1)

// TODO: in case of cluster upgrade the same Azure Zones must be send to the Provisioner
orchestrationHandler := orchestrate.NewOrchestrationHandler(db, clusterQueue, cfg.MaxPaginationPage, logs)
Expand Down Expand Up @@ -380,7 +379,7 @@ func main() {
runtimeHandler := runtime.NewHandler(db.Instances(), db.Operations(),
db.RuntimeStates(), db.InstancesArchived(), cfg.MaxPaginationPage,
cfg.DefaultRequestRegion, provisionerClient,
cli,
kcpK8sClient,
cfg.Broker.KimConfig,
logs)
runtimeHandler.AttachRoutes(router)
Expand Down Expand Up @@ -410,9 +409,10 @@ func logConfiguration(logs *logrus.Logger, cfg Config) {
cfg.Broker.KimConfig.Plans,
cfg.Broker.KimConfig.KimOnlyPlans)
logs.Infof("Is SubaccountMovementEnabled: %t", cfg.Broker.SubaccountMovementEnabled)
logs.Infof("Is UpdateCustomResouresLabelsOnAccountMove enabled: %t", cfg.Broker.UpdateCustomResouresLabelsOnAccountMove)
}

func createAPI(router *mux.Router, servicesConfig broker.ServicesConfig, planValidator broker.PlanValidator, cfg *Config, db storage.BrokerStorage, provisionQueue, deprovisionQueue, updateQueue *process.Queue, logger lager.Logger, logs logrus.FieldLogger, planDefaults broker.PlanDefaults, kcBuilder kubeconfig.KcBuilder, clientProvider K8sClientProvider, kubeconfigProvider KubeconfigProvider, gardenerClient client.Client) {
func createAPI(router *mux.Router, servicesConfig broker.ServicesConfig, planValidator broker.PlanValidator, cfg *Config, db storage.BrokerStorage, provisionQueue, deprovisionQueue, updateQueue *process.Queue, logger lager.Logger, logs logrus.FieldLogger, planDefaults broker.PlanDefaults, kcBuilder kubeconfig.KcBuilder, clientProvider K8sClientProvider, kubeconfigProvider KubeconfigProvider, gardenerClient, kcpK8sClient client.Client) {
suspensionCtxHandler := suspension.NewContextUpdateHandler(db.Operations(), provisionQueue, deprovisionQueue, logs)

defaultPlansConfig, err := servicesConfig.DefaultPlansConfig()
Expand Down Expand Up @@ -443,8 +443,8 @@ func createAPI(router *mux.Router, servicesConfig broker.ServicesConfig, planVal
),
DeprovisionEndpoint: broker.NewDeprovision(db.Instances(), db.Operations(), deprovisionQueue, logs),
UpdateEndpoint: broker.NewUpdate(cfg.Broker, db.Instances(), db.RuntimeStates(), db.Operations(),
suspensionCtxHandler, cfg.UpdateProcessingEnabled, cfg.Broker.SubaccountMovementEnabled, updateQueue, defaultPlansConfig,
planDefaults, logs, cfg.KymaDashboardConfig, kcBuilder, convergedCloudRegionProvider),
suspensionCtxHandler, cfg.UpdateProcessingEnabled, cfg.Broker.SubaccountMovementEnabled, cfg.Broker.UpdateCustomResouresLabelsOnAccountMove, updateQueue, defaultPlansConfig,
planDefaults, logs, cfg.KymaDashboardConfig, kcBuilder, convergedCloudRegionProvider, kcpK8sClient),
GetInstanceEndpoint: broker.NewGetInstance(cfg.Broker, db.Instances(), db.Operations(), kcBuilder, logs),
LastOperationEndpoint: broker.NewLastOperation(db.Operations(), db.InstancesArchived(), logs),
BindEndpoint: broker.NewBind(cfg.Broker.Binding, db.Instances(), logs, clientProvider, kubeconfigProvider, gardenerClient, cfg.BindingTokenExpirationSeconds),
Expand Down
4 changes: 3 additions & 1 deletion internal/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const (
KymaServiceID = "47c9dcbf-ff30-448e-ab36-d3bad66ba281"
KymaServiceName = "kymaruntime"
KymaNamespace = "kyma-system"
)

type PlanDefaults func(planID string, platformProvider internal.CloudProvider, parametersProvider *internal.CloudProvider) (*gqlschema.ClusterConfigInput, error)
Expand Down Expand Up @@ -55,7 +56,8 @@ type Config struct {

DisableSapConvergedCloud bool `envconfig:"default=false"`

SubaccountMovementEnabled bool `envconfig:"default=false"`
SubaccountMovementEnabled bool `envconfig:"default=false"`
UpdateCustomResouresLabelsOnAccountMove bool `envconfig:"default=false"`
}

type ServicesConfig map[string]Service
Expand Down
108 changes: 81 additions & 27 deletions internal/broker/instance_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package broker
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
Expand All @@ -12,12 +13,16 @@ import (

"github.com/kyma-incubator/compass/components/director/pkg/jsonschema"
"github.com/kyma-project/kyma-environment-broker/internal/euaccess"
"github.com/kyma-project/kyma-environment-broker/internal/k8s"
"github.com/kyma-project/kyma-environment-broker/internal/kubeconfig"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/google/uuid"
"github.com/pivotal-cf/brokerapi/v8/domain"
"github.com/pivotal-cf/brokerapi/v8/domain/apiresponses"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/kyma-project/kyma-environment-broker/internal"
Expand All @@ -35,12 +40,13 @@ type UpdateEndpoint struct {
config Config
log logrus.FieldLogger

instanceStorage storage.Instances
runtimeStates storage.RuntimeStates
contextUpdateHandler ContextUpdateHandler
brokerURL string
processingEnabled bool
subaccountMovementEnabled bool
instanceStorage storage.Instances
runtimeStates storage.RuntimeStates
contextUpdateHandler ContextUpdateHandler
brokerURL string
processingEnabled bool
subaccountMovementEnabled bool
updateCustomResouresLabelsOnAccountMove bool

operationStorage storage.Operations

Expand All @@ -53,6 +59,8 @@ type UpdateEndpoint struct {
kcBuilder kubeconfig.KcBuilder

convergedCloudRegionsProvider ConvergedCloudRegionProvider

kcpClient client.Client
}

func NewUpdate(cfg Config,
Expand All @@ -62,29 +70,33 @@ func NewUpdate(cfg Config,
ctxUpdateHandler ContextUpdateHandler,
processingEnabled bool,
subaccountMovementEnabled bool,
updateCustomResouresLabelsOnAccountMove bool,
queue Queue,
plansConfig PlansConfig,
planDefaults PlanDefaults,
log logrus.FieldLogger,
dashboardConfig dashboard.Config,
kcBuilder kubeconfig.KcBuilder,
convergedCloudRegionsProvider ConvergedCloudRegionProvider,
kcpClient client.Client,
) *UpdateEndpoint {
return &UpdateEndpoint{
config: cfg,
log: log.WithField("service", "UpdateEndpoint"),
instanceStorage: instanceStorage,
runtimeStates: runtimeStates,
operationStorage: operationStorage,
contextUpdateHandler: ctxUpdateHandler,
processingEnabled: processingEnabled,
subaccountMovementEnabled: subaccountMovementEnabled,
updatingQueue: queue,
plansConfig: plansConfig,
planDefaults: planDefaults,
dashboardConfig: dashboardConfig,
kcBuilder: kcBuilder,
convergedCloudRegionsProvider: convergedCloudRegionsProvider,
config: cfg,
log: log.WithField("service", "UpdateEndpoint"),
instanceStorage: instanceStorage,
runtimeStates: runtimeStates,
operationStorage: operationStorage,
contextUpdateHandler: ctxUpdateHandler,
processingEnabled: processingEnabled,
subaccountMovementEnabled: subaccountMovementEnabled,
updateCustomResouresLabelsOnAccountMove: updateCustomResouresLabelsOnAccountMove,
updatingQueue: queue,
plansConfig: plansConfig,
planDefaults: planDefaults,
dashboardConfig: dashboardConfig,
kcBuilder: kcBuilder,
convergedCloudRegionsProvider: convergedCloudRegionsProvider,
kcpClient: kcpClient,
}
}

Expand Down Expand Up @@ -347,20 +359,28 @@ func (b *UpdateEndpoint) processContext(instance *internal.Instance, details dom
instance.Parameters.ErsContext.Active = ersContext.Active
}

if b.subaccountMovementEnabled {
if instance.GlobalAccountID != ersContext.GlobalAccountID && ersContext.GlobalAccountID != "" {
if instance.SubscriptionGlobalAccountID == "" {
instance.SubscriptionGlobalAccountID = instance.GlobalAccountID
}
instance.GlobalAccountID = ersContext.GlobalAccountID
needUpdateCustomResources := false
if b.subaccountMovementEnabled && (instance.GlobalAccountID != ersContext.GlobalAccountID && ersContext.GlobalAccountID != "") {
if instance.SubscriptionGlobalAccountID == "" {
instance.SubscriptionGlobalAccountID = instance.GlobalAccountID
}

instance.GlobalAccountID = ersContext.GlobalAccountID
needUpdateCustomResources = true
}

newInstance, err := b.instanceStorage.Update(*instance)
if err != nil {
logger.Errorf("processing context updated failed: %s", err.Error())
return nil, changed, fmt.Errorf("unable to process the update")
} else if b.updateCustomResouresLabelsOnAccountMove && needUpdateCustomResources {
// update labels on related CRs, but only if account movement was successfully persisted and kept in database
err = b.updateLabels(newInstance.RuntimeID, newInstance.GlobalAccountID)
if err != nil {
// silent error by design for now
logger.Errorf("unable to update global account label on CRs while doing account move: %s", err.Error())
response := apiresponses.NewFailureResponse(fmt.Errorf("Update CR failed"), http.StatusInternalServerError, err.Error())
return newInstance, changed, response
}
}

return newInstance, changed, nil
Expand Down Expand Up @@ -389,3 +409,37 @@ func (b *UpdateEndpoint) getJsonSchemaValidator(provider internal.CloudProvider,

return jsonschema.NewValidatorFromStringSchema(schema)
}

func (b *UpdateEndpoint) updateLabels(id, newGlobalAccountId string) error {
kymaErr := b.updateCrLabel(id, k8s.KymaCr, newGlobalAccountId)
gardenerClusterErr := b.updateCrLabel(id, k8s.GardenerClusterCr, newGlobalAccountId)
runtimeErr := b.updateCrLabel(id, k8s.RuntimeCr, newGlobalAccountId)
err := errors.Join(kymaErr, gardenerClusterErr, runtimeErr)
return err
}

func (b *UpdateEndpoint) updateCrLabel(id, crName, newGlobalAccountId string) error {
jaroslaw-pieszka marked this conversation as resolved.
Show resolved Hide resolved
gvk, err := k8s.GvkByName(crName)
if err != nil {
return fmt.Errorf("while getting gvk for name: %s: %s", crName, err.Error())
}

var k8sObject unstructured.Unstructured
k8sObject.SetGroupVersionKind(gvk)
err = b.kcpClient.Get(context.Background(), types.NamespacedName{Namespace: KymaNamespace, Name: id}, &k8sObject)
if err != nil {
return fmt.Errorf("while getting k8s object of type %s from kcp cluster for instance %s, due to: %s", crName, id, err.Error())
}

err = k8s.AddOrOverrideMetadata(&k8sObject, k8s.GlobalAccountIdLabel, newGlobalAccountId)
jaroslaw-pieszka marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("while adding or overriding label (new=%s) for k8s object %s %s, because: %s", newGlobalAccountId, id, crName, err.Error())
}

err = b.kcpClient.Update(context.Background(), &k8sObject)
if err != nil {
return fmt.Errorf("while updating k8s object %s %s, because: %s", id, crName, err.Error())
}

return nil
}
Loading
Loading