Skip to content

Commit

Permalink
fix crossplane provider updates
Browse files Browse the repository at this point in the history
  • Loading branch information
vramk23 committed Nov 21, 2023
1 parent 731ef21 commit a0ffc19
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 60 deletions.
12 changes: 6 additions & 6 deletions capten/agent/internal/api/plugin_crossplane_provider_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ const (

func (a *Agent) AddCrossplanProvider(ctx context.Context, request *captenpluginspb.AddCrossplanProviderRequest) (
*captenpluginspb.AddCrossplanProviderResponse, error) {
if err := validateArgs(request.CloudType, request.ProviderName, request.CloudProviderId); err != nil {
if err := validateArgs(request.CloudType, request.CloudProviderId); err != nil {
a.log.Infof("request validation failed", err)
return &captenpluginspb.AddCrossplanProviderResponse{
Status: captenpluginspb.StatusCode_INVALID_ARGUMENT,
StatusMessage: "request validation failed",
}, nil
}
a.log.Infof("Add Crossplane Provider %s with cloud provider %s request recieved", request.ProviderName, request.CloudProviderId)
a.log.Infof("Add Crossplane Provider type %s with cloud provider %s request recieved", request.CloudType, request.CloudProviderId)

project, err := a.as.GetCrossplanProviderByCloudType(request.CloudType)
if err != nil {
Expand Down Expand Up @@ -55,7 +55,7 @@ func (a *Agent) AddCrossplanProvider(ctx context.Context, request *captenplugins
}, nil
}

a.log.Infof("Crossplane Provider %s added with id %s", request.ProviderName, id.String())
a.log.Infof("Crossplane Provider type %s added with id %s", request.CloudType, id.String())
return &captenpluginspb.AddCrossplanProviderResponse{
Id: id.String(),
Status: captenpluginspb.StatusCode_OK,
Expand Down Expand Up @@ -122,15 +122,15 @@ func (a *Agent) GetCrossplanProviders(ctx context.Context, _ *captenpluginspb.Ge
func (a *Agent) UpdateCrossplanProvider(ctx context.Context, request *captenpluginspb.UpdateCrossplanProviderRequest) (
*captenpluginspb.UpdateCrossplanProviderResponse, error) {

if err := validateArgs(request.Id, request.CloudType, request.ProviderName, request.CloudProviderId); err != nil {
if err := validateArgs(request.Id, request.CloudType, request.CloudProviderId); err != nil {
a.log.Infof("request validation failed", err)
return &captenpluginspb.UpdateCrossplanProviderResponse{
Status: captenpluginspb.StatusCode_INVALID_ARGUMENT,
StatusMessage: "request validation failed",
}, nil
}

a.log.Infof("Update Crossplane Provider %s, %s request recieved", request.Id, request.ProviderName)
a.log.Infof("Update Crossplane Provider %s, %s, %s request recieved", request.CloudType, request.Id, request.CloudProviderId)

project, err := a.as.GetCrossplanProviderById(request.Id)
if err != nil {
Expand Down Expand Up @@ -167,7 +167,7 @@ func (a *Agent) UpdateCrossplanProvider(ctx context.Context, request *captenplug
}, nil
}

a.log.Infof("Crossplane Provider with id %s, %s updated", request.Id, request.ProviderName)
a.log.Infof("Crossplane Provider type %s with id %s updated", request.CloudType, request.Id)
return &captenpluginspb.UpdateCrossplanProviderResponse{
Status: captenpluginspb.StatusCode_OK,
StatusMessage: "ok",
Expand Down
59 changes: 33 additions & 26 deletions capten/agent/internal/crossplane/cluster_claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,75 +44,77 @@ func NewClusterClaimSyncHandler(log logging.Logger, dbStore *captenstore.Store)
return &ClusterClaimSyncHandler{log: log, dbStore: dbStore}
}

func RegisterK8SClusterClaimWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
func registerK8SClusterClaimWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
return k8s.RegisterDynamicInformers(NewClusterClaimSyncHandler(log, dbStore), dynamicClient, cgvk)
}

func getClusterClaimObj(obj any) *model.ClusterClaim {
func getClusterClaimObj(obj any) (*model.ClusterClaim, error) {
clusterClaimByte, err := json.Marshal(obj)
if err != nil {
return nil
return nil, err
}

var clObj model.ClusterClaim
err = json.Unmarshal(clusterClaimByte, &clObj)
if err != nil {
return nil
return nil, err
}

return &clObj
return &clObj, nil
}

func (h *ClusterClaimSyncHandler) OnAdd(obj interface{}) {
newCcObj := getClusterClaimObj(obj)
if newCcObj == nil {
return
}
h.log.Info("Crossplane ClusterCliam Add Callback")

k8sclient, err := k8s.NewK8SClient(h.log)
if err != nil {
newCcObj, err := getClusterClaimObj(obj)
if newCcObj == nil {
h.log.Errorf("failed to read ClusterCliam object, %v", err)
return
}

if err = h.updateManagedClusters(k8sclient, []model.ClusterClaim{*newCcObj}); err != nil {
if err = h.updateManagedClusters([]model.ClusterClaim{*newCcObj}); err != nil {
h.log.Errorf("failed to update ClusterCliam object, %v", err)
return
}

h.log.Info("cluster-claims resources synched")
}

func (h *ClusterClaimSyncHandler) OnUpdate(oldObj, newObj interface{}) {
prevObj := getClusterClaimObj(oldObj)
h.log.Info("Crossplane ClusterCliam Update Callback")

prevObj, err := getClusterClaimObj(oldObj)
if prevObj == nil {
h.log.Errorf("failed to read ClusterCliam old object %v", err)
return
}

newCcObj := getClusterClaimObj(oldObj)
newCcObj, err := getClusterClaimObj(oldObj)
if newCcObj == nil {
h.log.Errorf("failed to read ClusterCliam new object %v", err)
return
}

// We receive the objects details on configured interval, identify actual updates made on the obj.
if newCcObj.Metadata.ResourceVersion == newCcObj.Metadata.ResourceVersion {
if newCcObj.Metadata.ResourceVersion == prevObj.Metadata.ResourceVersion {
h.log.Info("Crossplane ClusterCliam previous version is same")
return
}

k8sclient, err := k8s.NewK8SClient(h.log)
if err != nil {
return
}

if err = h.updateManagedClusters(k8sclient, []model.ClusterClaim{*newCcObj}); err != nil {
if err = h.updateManagedClusters([]model.ClusterClaim{*newCcObj}); err != nil {
h.log.Errorf("failed to update ClusterCliam object, %v", err)
return
}

h.log.Info("cluster-claims resources synched")
}

func (h *ClusterClaimSyncHandler) OnDelete(obj interface{}) {}
func (h *ClusterClaimSyncHandler) OnDelete(obj interface{}) {
h.log.Info("Crossplane ClusterCliam Delete Callback")
}

func (h *ClusterClaimSyncHandler) Sync() error {
h.log.Debug("started to sync cluster-claims resources")
h.log.Debug("started to sync ClusterCliam resources")

k8sclient, err := k8s.NewK8SClient(h.log)
if err != nil {
Expand All @@ -135,14 +137,19 @@ func (h *ClusterClaimSyncHandler) Sync() error {
return fmt.Errorf("failed to unmarshal cluster claim resources, %v", err)
}

if err = h.updateManagedClusters(k8sclient, clObj.Items); err != nil {
if err = h.updateManagedClusters(clObj.Items); err != nil {
return fmt.Errorf("failed to update clusters in DB, %v", err)
}
h.log.Info("cluster-claims resources synched")
return nil
}

func (h *ClusterClaimSyncHandler) updateManagedClusters(k8sClient *k8s.K8SClient, clusterCliams []model.ClusterClaim) error {
func (h *ClusterClaimSyncHandler) updateManagedClusters(clusterCliams []model.ClusterClaim) error {
k8sclient, err := k8s.NewK8SClient(h.log)
if err != nil {
return fmt.Errorf("failed to get k8s client, %v", err)
}

clusters, err := h.getManagedClusters()
if err != nil {
return fmt.Errorf("failed to get managed clusters from DB, %v", err)
Expand All @@ -169,7 +176,7 @@ func (h *ClusterClaimSyncHandler) updateManagedClusters(k8sClient *k8s.K8SClient

if status.Status == readyStatusValue {
secretName := fmt.Sprintf(clusterSecretName, clusterCliam.Spec.Id)
resp, err := k8sClient.GetSecretData(clusterCliam.Metadata.Namespace, secretName)
resp, err := k8sclient.GetSecretData(clusterCliam.Metadata.Namespace, secretName)
if err != nil {
h.log.Errorf("failed to get secret %s/%s, %v", clusterCliam.Metadata.Namespace, secretName, err)
continue
Expand Down
35 changes: 22 additions & 13 deletions capten/agent/internal/crossplane/package_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,69 @@ func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) *Pr
return &ProvidersSyncHandler{log: log, dbStore: dbStore}
}

func RegisterK8SProviderWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
func registerK8SProviderWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
return k8s.RegisterDynamicInformers(NewProvidersSyncHandler(log, dbStore), dynamicClient, pgvk)
}

func getProviderObj(obj any) *model.Provider {
func getProviderObj(obj any) (*model.Provider, error) {
clusterClaimByte, err := json.Marshal(obj)
if err != nil {
return nil
return nil, err
}

var clObj model.Provider
err = json.Unmarshal(clusterClaimByte, &clObj)
if err != nil {
return nil
return nil, err
}

return &clObj
return &clObj, nil
}

func (h *ProvidersSyncHandler) OnAdd(obj interface{}) {
newCcObj := getProviderObj(obj)
h.log.Info("Crossplane Provider Add Callback")
newCcObj, err := getProviderObj(obj)
if newCcObj == nil {
h.log.Errorf("failed to read Provider object, %v", err)
return
}

if err := h.updateCrossplaneProvider([]model.Provider{*newCcObj}); err != nil {
h.log.Errorf("failed to update Provider object, %v", err)
return
}

h.log.Info("Crossplane Provider resources synched")
}

func (h *ProvidersSyncHandler) OnUpdate(oldObj, newObj interface{}) {
prevObj := getProviderObj(oldObj)
h.log.Info("Crossplane Provider Update Callback")
prevObj, err := getProviderObj(oldObj)
if prevObj == nil {
h.log.Errorf("failed to read Provider old object %v", err)
return
}

newCcObj := getProviderObj(oldObj)
newCcObj, err := getProviderObj(oldObj)
if newCcObj == nil {
h.log.Errorf("failed to read Provider new object %v", err)
return
}

// We receive the objects details on configured interval, identify actual updates made on the obj.
if newCcObj.ResourceVersion == newCcObj.ResourceVersion {
if newCcObj.ResourceVersion == prevObj.ResourceVersion {
h.log.Info("Crossplane Provider previous version is same")
return
}

if err := h.updateCrossplaneProvider([]model.Provider{*newCcObj}); err != nil {
h.log.Errorf("failed to update Provider object, %v", err)
return
}

h.log.Info("Crossplane Provider resources synched")
}

func (h *ProvidersSyncHandler) OnDelete(obj interface{}) {}
func (h *ProvidersSyncHandler) OnDelete(obj interface{}) {
h.log.Info("Crossplane Provider Delete Callback")
}

func (h *ProvidersSyncHandler) Sync() error {
h.log.Debug("started to sync CrossplaneProvider resources")
Expand Down Expand Up @@ -127,6 +135,7 @@ func (h *ProvidersSyncHandler) updateCrossplaneProvider(k8sProviders []model.Pro
}

for _, k8sProvider := range k8sProviders {
h.log.Infof("processing Crossplane Provider %s", k8sProvider.Name)
for _, providerStatus := range k8sProvider.Status.Conditions {
if providerStatus.Type != model.TypeHealthy {
continue
Expand Down
9 changes: 2 additions & 7 deletions capten/agent/internal/crossplane/watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ func RegisterK8SWatcher(log logging.Logger, dbStore *captenstore.Store) error {
return fmt.Errorf("failed to initalize k8s client: %v", err)
}

err = RegisterK8SClusterClaimWatcher(log, dbStore, k8sclient.DynamicClientInterface)
err = registerK8SClusterClaimWatcher(log, dbStore, k8sclient.DynamicClientInterface)
if err != nil {
return fmt.Errorf("failed to RegisterK8SClusterClaimWatcher: %v", err)
}

err = RegisterK8SProviderWatcher(log, dbStore, k8sclient.DynamicClientInterface)
err = registerK8SProviderWatcher(log, dbStore, k8sclient.DynamicClientInterface)
if err != nil {
return fmt.Errorf("failed to RegisterK8SProviderWatcher: %v", err)
}

cc := NewClusterClaimSyncHandler(log, dbStore)
go cc.Sync()
ps := NewProvidersSyncHandler(log, dbStore)
go ps.Sync()
return nil
}
6 changes: 3 additions & 3 deletions server/pkg/agent/agent_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func (s *AgentHandler) UpdateAgent(clusterID string, agentCfg *Config) error {
}

func (s *AgentHandler) GetAgent(orgId, clusterID string) (*Agent, error) {
agent := s.getAgent(clusterID)
/*agent := s.getAgent(clusterID)
if agent != nil {
return agent, nil
}
}*/

cfg, err := s.getAgentConfig(orgId, clusterID)
if err != nil {
Expand All @@ -69,7 +69,7 @@ func (s *AgentHandler) GetAgent(orgId, clusterID string) (*Agent, error) {
return nil, err
}

agent = s.getAgent(clusterID)
agent := s.getAgent(clusterID)
if agent != nil {
return agent, nil
}
Expand Down
10 changes: 5 additions & 5 deletions server/pkg/api/plugin_crossplane_provider_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func (s *Server) AddCrossplanProvider(ctx context.Context, request *captenpluginspb.AddCrossplanProviderRequest) (
*captenpluginspb.AddCrossplanProviderResponse, error) {
orgId, clusterId, err := validateOrgClusterWithArgs(ctx, request.CloudProviderId, request.CloudType, request.ProviderName)
orgId, clusterId, err := validateOrgClusterWithArgs(ctx, request.CloudProviderId, request.CloudType)
if err != nil {
s.log.Infof("request validation failed", err)
return &captenpluginspb.AddCrossplanProviderResponse{
Expand Down Expand Up @@ -39,7 +39,7 @@ func (s *Server) AddCrossplanProvider(ctx context.Context, request *captenplugin
}

if response.Status != captenpluginspb.StatusCode_OK {
s.log.Errorf("failed to add the ClusterProject for cluster %s, %s, %s", response.Status, response.StatusMessage)
s.log.Errorf("failed to add the CrossplanProvider for cluster %s, %s, %s", response.Status, response.StatusMessage)
return &captenpluginspb.AddCrossplanProviderResponse{
Status: captenpluginspb.StatusCode_INTERNAL_ERROR,
StatusMessage: "failed to add the Cluster CrossplanProvider",
Expand All @@ -58,7 +58,7 @@ func (s *Server) AddCrossplanProvider(ctx context.Context, request *captenplugin

func (s *Server) UpdateCrossplanProvider(ctx context.Context, request *captenpluginspb.UpdateCrossplanProviderRequest) (
*captenpluginspb.UpdateCrossplanProviderResponse, error) {
orgId, clusterId, err := validateOrgClusterWithArgs(ctx, request.Id, request.CloudType, request.CloudProviderId, request.ProviderName)
orgId, clusterId, err := validateOrgClusterWithArgs(ctx, request.Id, request.CloudType, request.CloudProviderId)
if err != nil {
s.log.Infof("request validation failed", err)
return &captenpluginspb.UpdateCrossplanProviderResponse{
Expand Down Expand Up @@ -89,7 +89,7 @@ func (s *Server) UpdateCrossplanProvider(ctx context.Context, request *captenplu
}

if response.Status != captenpluginspb.StatusCode_OK {
s.log.Errorf("failed to update the ClusterProject for cluster %s, %s, %s", response.Status, response.StatusMessage)
s.log.Errorf("failed to update the CrossplanProvider for cluster %s, %s, %s", response.Status, response.StatusMessage)
return &captenpluginspb.UpdateCrossplanProviderResponse{
Status: captenpluginspb.StatusCode_INTERNAL_ERROR,
StatusMessage: "failed to update the Cluster CrossplanProvider",
Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *Server) DeleteCrossplanProvider(ctx context.Context, request *captenplu
}

if response.Status != captenpluginspb.StatusCode_OK {
s.log.Errorf("failed to update the ClusterProject for cluster %s, %s, %s", response.Status, response.StatusMessage)
s.log.Errorf("failed to update the CrossplanProvider for cluster %s, %s, %s", response.Status, response.StatusMessage)
return &captenpluginspb.DeleteCrossplanProviderResponse{
Status: captenpluginspb.StatusCode_INTERNAL_ERROR,
StatusMessage: "failed to delete the Cluster CrossplanProvider",
Expand Down

0 comments on commit a0ffc19

Please sign in to comment.